This is page 2 of 3. Use http://codebase.md/stripe/agent-toolkit?page={x} to view the full context. # Directory Structure ``` ├── .github │ ├── ISSUE_TEMPLATE │ │ ├── bug_report.yml │ │ ├── config.yml │ │ └── feature_request.yml │ └── workflows │ ├── main.yml │ ├── npm_agent_toolkit_release.yml │ ├── npm_mcp_release.yml │ └── pypi_release.yml ├── .gitignore ├── .vscode │ ├── extensions.json │ ├── launch.json │ └── settings.json ├── CODE_OF_CONDUCT.md ├── CONTRIBUTING.md ├── evals │ ├── .env.example │ ├── .gitignore │ ├── braintrust_openai.ts │ ├── cases.ts │ ├── eval.ts │ ├── package.json │ ├── pnpm-lock.yaml │ ├── README.md │ ├── scorer.ts │ └── tsconfig.json ├── gemini-extension.json ├── LICENSE ├── modelcontextprotocol │ ├── .dxtignore │ ├── .gitignore │ ├── .node-version │ ├── .prettierrc │ ├── build-dxt.js │ ├── Dockerfile │ ├── eslint.config.mjs │ ├── jest.config.ts │ ├── manifest.json │ ├── package.json │ ├── pnpm-lock.yaml │ ├── README.md │ ├── src │ │ ├── index.ts │ │ └── test │ │ └── index.test.ts │ ├── stripe_icon.png │ └── tsconfig.json ├── python │ ├── .editorconfig │ ├── .flake8 │ ├── examples │ │ ├── crewai │ │ │ ├── .env.template │ │ │ ├── main.py │ │ │ └── README.md │ │ ├── langchain │ │ │ ├── __init__.py │ │ │ ├── .env.template │ │ │ ├── main.py │ │ │ └── README.md │ │ ├── openai │ │ │ ├── .env.template │ │ │ ├── customer_support │ │ │ │ ├── .env.template │ │ │ │ ├── emailer.py │ │ │ │ ├── env.py │ │ │ │ ├── main.py │ │ │ │ ├── pyproject.toml │ │ │ │ ├── README.md │ │ │ │ ├── repl.py │ │ │ │ └── support_agent.py │ │ │ ├── file_search │ │ │ │ ├── main.py │ │ │ │ └── README.md │ │ │ └── web_search │ │ │ ├── .env.template │ │ │ ├── main.py │ │ │ └── README.md │ │ └── strands │ │ └── main.py │ ├── Makefile │ ├── pyproject.toml │ ├── README.md │ ├── requirements.txt │ ├── stripe_agent_toolkit │ │ ├── __init__.py │ │ ├── api.py │ │ ├── configuration.py │ │ ├── crewai │ │ │ ├── tool.py │ │ │ └── toolkit.py │ │ ├── functions.py │ │ ├── langchain │ │ │ ├── tool.py │ │ │ └── toolkit.py │ │ ├── openai │ │ │ ├── hooks.py │ │ │ ├── tool.py │ │ │ └── toolkit.py │ │ ├── prompts.py │ │ ├── schema.py │ │ ├── strands │ │ │ ├── __init__.py │ │ │ ├── hooks.py │ │ │ ├── tool.py │ │ │ └── toolkit.py │ │ └── tools.py │ └── tests │ ├── __init__.py │ ├── test_configuration.py │ └── test_functions.py ├── README.md ├── SECURITY.md └── typescript ├── .gitignore ├── .prettierrc ├── eslint.config.mjs ├── examples │ ├── ai-sdk │ │ ├── .env.template │ │ ├── index.ts │ │ ├── package.json │ │ ├── README.md │ │ └── tsconfig.json │ ├── cloudflare │ │ ├── .dev.vars.example │ │ ├── .gitignore │ │ ├── biome.json │ │ ├── package.json │ │ ├── README.md │ │ ├── src │ │ │ ├── app.ts │ │ │ ├── imageGenerator.ts │ │ │ ├── index.ts │ │ │ ├── oauth.ts │ │ │ └── utils.ts │ │ ├── tsconfig.json │ │ ├── worker-configuration.d.ts │ │ └── wrangler.jsonc │ ├── langchain │ │ ├── .env.template │ │ ├── index.ts │ │ ├── package.json │ │ ├── README.md │ │ └── tsconfig.json │ └── openai │ ├── .env.template │ ├── index.ts │ ├── package.json │ ├── README.md │ └── tsconfig.json ├── jest.config.ts ├── package.json ├── pnpm-lock.yaml ├── pnpm-workspace.yaml ├── README.md ├── src │ ├── ai-sdk │ │ ├── index.ts │ │ ├── tool.ts │ │ └── toolkit.ts │ ├── cloudflare │ │ ├── index.ts │ │ └── README.md │ ├── langchain │ │ ├── index.ts │ │ ├── tool.ts │ │ └── toolkit.ts │ ├── modelcontextprotocol │ │ ├── index.ts │ │ ├── README.md │ │ ├── register-paid-tool.ts │ │ └── toolkit.ts │ ├── openai │ │ ├── index.ts │ │ └── toolkit.ts │ ├── shared │ │ ├── api.ts │ │ ├── balance │ │ │ └── retrieveBalance.ts │ │ ├── configuration.ts │ │ ├── coupons │ │ │ ├── createCoupon.ts │ │ │ └── listCoupons.ts │ │ ├── customers │ │ │ ├── createCustomer.ts │ │ │ └── listCustomers.ts │ │ ├── disputes │ │ │ ├── listDisputes.ts │ │ │ └── updateDispute.ts │ │ ├── documentation │ │ │ └── searchDocumentation.ts │ │ ├── invoiceItems │ │ │ └── createInvoiceItem.ts │ │ ├── invoices │ │ │ ├── createInvoice.ts │ │ │ ├── finalizeInvoice.ts │ │ │ └── listInvoices.ts │ │ ├── paymentIntents │ │ │ └── listPaymentIntents.ts │ │ ├── paymentLinks │ │ │ └── createPaymentLink.ts │ │ ├── prices │ │ │ ├── createPrice.ts │ │ │ └── listPrices.ts │ │ ├── products │ │ │ ├── createProduct.ts │ │ │ └── listProducts.ts │ │ ├── refunds │ │ │ └── createRefund.ts │ │ ├── subscriptions │ │ │ ├── cancelSubscription.ts │ │ │ ├── listSubscriptions.ts │ │ │ └── updateSubscription.ts │ │ └── tools.ts │ └── test │ ├── modelcontextprotocol │ │ └── register-paid-tool.test.ts │ └── shared │ ├── balance │ │ ├── functions.test.ts │ │ └── parameters.test.ts │ ├── configuration.test.ts │ ├── customers │ │ ├── functions.test.ts │ │ └── parameters.test.ts │ ├── disputes │ │ └── functions.test.ts │ ├── documentation │ │ ├── functions.test.ts │ │ └── parameters.test.ts │ ├── invoiceItems │ │ ├── functions.test.ts │ │ ├── parameters.test.ts │ │ └── prompts.test.ts │ ├── invoices │ │ ├── functions.test.ts │ │ ├── parameters.test.ts │ │ └── prompts.test.ts │ ├── paymentIntents │ │ ├── functions.test.ts │ │ ├── parameters.test.ts │ │ └── prompts.test.ts │ ├── paymentLinks │ │ ├── functions.test.ts │ │ ├── parameters.test.ts │ │ └── prompts.test.ts │ ├── prices │ │ ├── functions.test.ts │ │ └── parameters.test.ts │ ├── products │ │ ├── functions.test.ts │ │ └── parameters.test.ts │ ├── refunds │ │ ├── functions.test.ts │ │ └── parameters.test.ts │ └── subscriptions │ ├── functions.test.ts │ ├── parameters.test.ts │ └── prompts.test.ts ├── tsconfig.json └── tsup.config.ts ``` # Files -------------------------------------------------------------------------------- /typescript/src/ai-sdk/toolkit.ts: -------------------------------------------------------------------------------- ```typescript import StripeAPI from '../shared/api'; import tools from '../shared/tools'; import {isToolAllowed, type Configuration} from '../shared/configuration'; import type { CoreTool, LanguageModelV1StreamPart, Experimental_LanguageModelV1Middleware as LanguageModelV1Middleware, } from 'ai'; import StripeTool from './tool'; type StripeMiddlewareConfig = { billing?: { type?: 'token'; customer: string; meters: { input?: string; output?: string; }; }; }; class StripeAgentToolkit { private _stripe: StripeAPI; tools: {[key: string]: CoreTool}; constructor({ secretKey, configuration, }: { secretKey: string; configuration: Configuration; }) { this._stripe = new StripeAPI(secretKey, configuration.context); this.tools = {}; const context = configuration.context || {}; const filteredTools = tools(context).filter((tool) => isToolAllowed(tool, configuration) ); filteredTools.forEach((tool) => { // @ts-ignore this.tools[tool.method] = StripeTool( this._stripe, tool.method, tool.description, tool.parameters ); }); } middleware(config: StripeMiddlewareConfig): LanguageModelV1Middleware { const bill = async ({ promptTokens, completionTokens, }: { promptTokens: number; completionTokens: number; }) => { if (config.billing) { if (config.billing.meters.input) { await this._stripe.createMeterEvent({ event: config.billing.meters.input, customer: config.billing.customer, value: promptTokens.toString(), }); } if (config.billing.meters.output) { await this._stripe.createMeterEvent({ event: config.billing.meters.output, customer: config.billing.customer, value: completionTokens.toString(), }); } } }; return { wrapGenerate: async ({doGenerate}) => { const result = await doGenerate(); if (config.billing) { await bill(result.usage); } return result; }, wrapStream: async ({doStream}) => { const {stream, ...rest} = await doStream(); const transformStream = new TransformStream< LanguageModelV1StreamPart, LanguageModelV1StreamPart >({ async transform(chunk, controller) { if (chunk.type === 'finish') { if (config.billing) { await bill(chunk.usage); } } controller.enqueue(chunk); }, }); return { stream: stream.pipeThrough(transformStream), ...rest, }; }, }; } getTools(): {[key: string]: CoreTool} { return this.tools; } } export default StripeAgentToolkit; ``` -------------------------------------------------------------------------------- /evals/eval.ts: -------------------------------------------------------------------------------- ```typescript require("dotenv").config(); import { StripeAgentToolkit } from "../typescript/src/openai"; import type { ChatCompletion, ChatCompletionMessageParam, } from "openai/resources"; import { Eval } from "braintrust"; import { AssertionScorer, EvalCaseFunction, EvalInput } from "./scorer"; import { getEvalTestCases } from "./cases"; import braintrustOpenai from "./braintrust_openai"; // This is the core "workhorse" function that accepts an input and returns a response // which calls stripe agent tookit async function task(evalInput: EvalInput): Promise<EvalOutput> { const stripeAgentToolkit = new StripeAgentToolkit({ secretKey: process.env.STRIPE_SECRET_KEY!, configuration: { actions: { paymentLinks: { create: true, read: true, }, products: { create: true, read: true, }, prices: { create: true, read: true, }, coupons: { create: true, read: true, }, customers: { create: true, read: true, }, paymentIntents: { create: true, read: true, }, invoices: { create: true, read: true, }, invoiceItems: { create: true, read: true, }, refunds: { create: true, read: true, }, subscriptions: { read: true, update: true, }, balance: { read: true, }, disputes: { read: true, update: true, }, }, ...evalInput.toolkitConfigOverride, }, }); let messages: ChatCompletionMessageParam[] = [ { role: "user", content: evalInput.userPrompt, }, ]; let completion: ChatCompletion; const tools = stripeAgentToolkit.getTools(); while (true) { // eslint-disable-next-line no-await-in-loop completion = await braintrustOpenai.chat.completions.create({ model: "gpt-4o", messages, tools, }); const message = completion.choices[0].message; messages.push(message); if (message.tool_calls?.length! > 0) { // eslint-disable-next-line no-await-in-loop const toolMessages = await Promise.all( message.tool_calls!.map((tc) => stripeAgentToolkit.handleToolCall(tc)) ); messages = [...messages, ...toolMessages]; } else { break; } } return messages; } const BRAINTRUST_PROJECT = "agent-toolkit"; export type EvalOutput = ChatCompletionMessageParam[]; async function main() { await Eval<EvalInput, EvalOutput, EvalCaseFunction>(BRAINTRUST_PROJECT, { data: await getEvalTestCases(), task: async (input) => { const result = await task(input); return result; }, scores: [AssertionScorer], }); } main(); ``` -------------------------------------------------------------------------------- /typescript/src/test/shared/paymentIntents/functions.test.ts: -------------------------------------------------------------------------------- ```typescript import {listPaymentIntents} from '@/shared/paymentIntents/listPaymentIntents'; const Stripe = jest.fn().mockImplementation(() => ({ paymentIntents: { list: jest.fn(), }, })); let stripe: ReturnType<typeof Stripe>; beforeEach(() => { stripe = new Stripe('fake-api-key'); }); describe('listPaymentIntents', () => { it('should list payment intents and return them', async () => { const mockPaymentIntents = [ { id: 'pi_123456', customer: 'cus_123456', amount: 1000, status: 'succeeded', description: 'Test Payment Intent', }, ]; const context = {}; stripe.paymentIntents.list.mockResolvedValue({data: mockPaymentIntents}); const result = await listPaymentIntents(stripe, context, {}); expect(stripe.paymentIntents.list).toHaveBeenCalledWith({}, undefined); expect(result).toEqual(mockPaymentIntents); }); it('should list payment intents for a specific customer', async () => { const mockPaymentIntents = [ { id: 'pi_123456', customer: 'cus_123456', amount: 1000, status: 'succeeded', description: 'Test Payment Intent', }, ]; const context = {}; stripe.paymentIntents.list.mockResolvedValue({data: mockPaymentIntents}); const result = await listPaymentIntents(stripe, context, { customer: 'cus_123456', }); expect(stripe.paymentIntents.list).toHaveBeenCalledWith( { customer: 'cus_123456', }, undefined ); expect(result).toEqual(mockPaymentIntents); }); it('should specify the connected account if included in context', async () => { const mockPaymentIntents = [ { id: 'pi_123456', customer: 'cus_123456', amount: 1000, status: 'succeeded', description: 'Test Payment Intent', }, ]; const context = { account: 'acct_123456', }; stripe.paymentIntents.list.mockResolvedValue({data: mockPaymentIntents}); const result = await listPaymentIntents(stripe, context, {}); expect(stripe.paymentIntents.list).toHaveBeenCalledWith( {}, { stripeAccount: context.account, } ); expect(result).toEqual(mockPaymentIntents); }); it('should list payment intents for a specific customer if included in context', async () => { const mockPaymentIntents = [ { id: 'pi_123456', customer: 'cus_123456', amount: 1000, status: 'succeeded', description: 'Test Payment Intent', }, ]; const context = { customer: 'cus_123456', }; stripe.paymentIntents.list.mockResolvedValue({data: mockPaymentIntents}); const result = await listPaymentIntents(stripe, context, {}); expect(stripe.paymentIntents.list).toHaveBeenCalledWith( {customer: context.customer}, undefined ); expect(result).toEqual(mockPaymentIntents); }); }); ``` -------------------------------------------------------------------------------- /typescript/src/shared/coupons/createCoupon.ts: -------------------------------------------------------------------------------- ```typescript import Stripe from 'stripe'; import {z} from 'zod'; import type {Context} from '@/shared/configuration'; import type {Tool} from '@/shared/tools'; export const createCouponPrompt = (_context: Context = {}) => ` This tool will create a coupon in Stripe. It takes several arguments: - name (str): The name of the coupon. Only use one of percent_off or amount_off, not both: - percent_off (number, optional): The percentage discount to apply (between 0 and 100). - amount_off (number, optional): The amount to subtract from an invoice (in cents). Optional arguments for duration. Use if specific duration is desired, otherwise default to 'once'. - duration (str, optional): How long the discount will last ('once', 'repeating', or 'forever'). Defaults to 'once'. - duration_in_months (number, optional): The number of months the discount will last if duration is repeating. `; export const createCouponParameters = ( _context: Context = {} ): z.AnyZodObject => z.object({ name: z .string() .describe( 'Name of the coupon displayed to customers on invoices or receipts' ), percent_off: z .number() .min(0) .max(100) .optional() .describe( 'A positive float larger than 0, and smaller or equal to 100, that represents the discount the coupon will apply (required if amount_off is not passed)' ), amount_off: z .number() .describe( 'A positive integer representing the amount to subtract from an invoice total (required if percent_off is not passed)' ), currency: z .string() .optional() .default('USD') .describe( 'Three-letter ISO code for the currency of the amount_off parameter (required if amount_off is passed). Infer based on the amount_off. For example, if a coupon is $2 off, set currency to be USD.' ), duration: z .enum(['once', 'repeating', 'forever']) .default('once') .optional() .describe('How long the discount will last. Defaults to "once"'), duration_in_months: z .number() .optional() .describe( 'The number of months the discount will last if duration is repeating' ), }); export const createCouponAnnotations = () => ({ destructiveHint: false, idempotentHint: false, openWorldHint: true, readOnlyHint: false, title: 'Create coupon', }); export const createCoupon = async ( stripe: Stripe, context: Context, params: z.infer<ReturnType<typeof createCouponParameters>> ) => { try { const coupon = await stripe.coupons.create( params, context.account ? {stripeAccount: context.account} : undefined ); return {id: coupon.id}; } catch (error: any) { return `Failed to create coupon: ${error.message}`; } }; const tool = (context: Context): Tool => ({ method: 'create_coupon', name: 'Create Coupon', description: createCouponPrompt(context), parameters: createCouponParameters(context), annotations: createCouponAnnotations(), actions: { coupons: { create: true, }, }, execute: createCoupon, }); export default tool; ``` -------------------------------------------------------------------------------- /typescript/src/shared/disputes/updateDispute.ts: -------------------------------------------------------------------------------- ```typescript import Stripe from 'stripe'; import {z} from 'zod'; import type {Context} from '@/shared/configuration'; import type {Tool} from '@/shared/tools'; export const updateDisputePrompt = (_context: Context = {}) => ` When you receive a dispute, contacting your customer is always the best first step. If that doesn't work, you can submit evidence to help resolve the dispute in your favor. This tool helps. It takes the following arguments: - dispute (string): The ID of the dispute to update - evidence (object, optional): Evidence to upload for the dispute. - cancellation_policy_disclosure (string) - cancellation_rebuttal (string) - duplicate_charge_explanation (string) - uncategorized_text (string, optional): Any additional evidence or statements. - submit (boolean, optional): Whether to immediately submit evidence to the bank. If false, evidence is staged on the dispute. `; export const updateDisputeParameters = (_context: Context = {}) => z.object({ dispute: z.string().describe('The ID of the dispute to update'), evidence: z .object({ cancellation_policy_disclosure: z .string() .max(20000) .optional() .describe( 'An explanation of how and when the customer was shown your refund policy prior to purchase.' ), duplicate_charge_explanation: z .string() .max(20000) .optional() .describe( 'An explanation of the difference between the disputed charge versus the prior charge that appears to be a duplicate.' ), uncategorized_text: z .string() .max(20000) .optional() .describe('Any additional evidence or statements.'), }) .optional() .describe( 'Evidence to upload, to respond to a dispute. Updating any field in the hash will submit all fields in the hash for review.' ), submit: z .boolean() .optional() .describe( 'Whether to immediately submit evidence to the bank. If false, evidence is staged on the dispute.' ), }); export const updateDisputeAnnotations = () => ({ destructiveHint: false, idempotentHint: false, openWorldHint: true, readOnlyHint: false, title: 'Update dispute', }); export const updateDispute = async ( stripe: Stripe, context: Context, params: z.infer<ReturnType<typeof updateDisputeParameters>> ) => { try { const updateParams: Stripe.DisputeUpdateParams = { evidence: params.evidence, submit: params.submit, }; const updatedDispute = await stripe.disputes.update( params.dispute, updateParams, context.account ? {stripeAccount: context.account} : undefined ); return {id: updatedDispute.id}; } catch (error) { return 'Failed to update dispute'; } }; const tool = (context: Context): Tool => ({ method: 'update_dispute', name: 'Update Dispute', description: updateDisputePrompt(context), parameters: updateDisputeParameters(context), annotations: updateDisputeAnnotations(), actions: { disputes: { update: true, }, }, execute: updateDispute, }); export default tool; ``` -------------------------------------------------------------------------------- /typescript/src/test/shared/disputes/functions.test.ts: -------------------------------------------------------------------------------- ```typescript import {updateDispute} from '@/shared/disputes/updateDispute'; import {listDisputes} from '@/shared/disputes/listDisputes'; const Stripe = jest.fn().mockImplementation(() => ({ disputes: { update: jest.fn(), list: jest.fn(), }, })); let stripe: ReturnType<typeof Stripe>; beforeEach(() => { stripe = new Stripe('fake-api-key'); }); describe('updateDispute', () => { it('should update a dispute and return the id', async () => { const params = { dispute: 'dp_123456', evidence: { uncategorized_text: 'Test product', }, submit: true, }; const context = {}; const mockDispute = {id: 'dp_123456'}; stripe.disputes.update.mockResolvedValue(mockDispute); const result = await updateDispute(stripe, context, params); expect(stripe.disputes.update).toHaveBeenCalledWith( params.dispute, { evidence: params.evidence, submit: params.submit, }, undefined ); expect(result).toEqual({id: mockDispute.id}); }); it('should specify the connected account if included in context', async () => { const params = { dispute: 'dp_123456', evidence: { uncategorized_text: 'Test product', }, submit: true, }; const context = { account: 'acct_123456', }; const mockDispute = {id: 'dp_123456'}; stripe.disputes.update.mockResolvedValue(mockDispute); const result = await updateDispute(stripe, context, params); expect(stripe.disputes.update).toHaveBeenCalledWith( params.dispute, { evidence: params.evidence, submit: params.submit, }, { stripeAccount: context.account, } ); expect(result).toEqual({id: mockDispute.id}); }); }); describe('listDisputes', () => { it('should list disputes and return their ids', async () => { const mockDisputes = [{id: 'dp_123456'}, {id: 'dp_789012'}]; const context = {}; stripe.disputes.list.mockResolvedValue({data: mockDisputes}); const result = await listDisputes(stripe, context, {}); expect(stripe.disputes.list).toHaveBeenCalledWith({}, undefined); expect(result).toEqual(mockDisputes.map(({id}) => ({id}))); }); it('should specify the connected account if included in context', async () => { const mockDisputes = [{id: 'dp_123456'}, {id: 'dp_789012'}]; const context = { account: 'acct_123456', }; stripe.disputes.list.mockResolvedValue({data: mockDisputes}); const result = await listDisputes(stripe, context, {}); expect(stripe.disputes.list).toHaveBeenCalledWith( {}, { stripeAccount: context.account, } ); expect(result).toEqual(mockDisputes.map(({id}) => ({id}))); }); it('should pass through list parameters', async () => { const params = { charge: 'ch_123456', payment_intent: 'pi_123456', limit: 5, }; const mockDisputes = [{id: 'dp_123456'}, {id: 'dp_789012'}]; const context = {}; stripe.disputes.list.mockResolvedValue({data: mockDisputes}); const result = await listDisputes(stripe, context, params); expect(stripe.disputes.list).toHaveBeenCalledWith(params, undefined); expect(result).toEqual(mockDisputes.map(({id}) => ({id}))); }); }); ``` -------------------------------------------------------------------------------- /typescript/src/shared/subscriptions/updateSubscription.ts: -------------------------------------------------------------------------------- ```typescript import Stripe from 'stripe'; import {z} from 'zod'; import type {Context} from '@/shared/configuration'; import type {Tool} from '@/shared/tools'; export const updateSubscriptionPrompt = (_context: Context = {}): string => { return `This tool will update an existing subscription in Stripe. If changing an existing subscription item, the existing subscription item has to be set to deleted and the new one has to be added. It takes the following arguments: - subscription (str, required): The ID of the subscription to update. - proration_behavior (str, optional): Determines how to handle prorations when the subscription items change. Options: 'create_prorations', 'none', 'always_invoice', 'none_implicit'. - items (array, optional): A list of subscription items to update, add, or remove. Each item can have the following properties: - id (str, optional): The ID of the subscription item to modify. - price (str, optional): The ID of the price to switch to. - quantity (int, optional): The quantity of the plan to subscribe to. - deleted (bool, optional): Whether to delete this item. `; }; export const updateSubscription = async ( stripe: Stripe, context: Context, params: z.infer<ReturnType<typeof updateSubscriptionParameters>> ) => { try { const {subscription: subscriptionId, ...updateParams} = params; const subscription = await stripe.subscriptions.update( subscriptionId, updateParams, context.account ? {stripeAccount: context.account} : undefined ); return subscription; } catch (error) { return 'Failed to update subscription'; } }; export const updateSubscriptionParameters = ( _context: Context = {} ): z.AnyZodObject => { return z.object({ subscription: z.string().describe('The ID of the subscription to update.'), proration_behavior: z .enum(['create_prorations', 'none', 'always_invoice', 'none_implicit']) .optional() .describe( 'Determines how to handle prorations when the subscription items change.' ), items: z .array( z.object({ id: z .string() .optional() .describe('The ID of the subscription item to modify.'), price: z .string() .optional() .describe('The ID of the price to switch to.'), quantity: z .number() .int() .min(1) .optional() .describe('The quantity of the plan to subscribe to.'), deleted: z .boolean() .optional() .describe('Whether to delete this item.'), }) ) .optional() .describe('A list of subscription items to update, add, or remove.'), }); }; export const updateSubscriptionAnnotations = () => ({ destructiveHint: false, idempotentHint: false, openWorldHint: true, readOnlyHint: false, title: 'Update subscription', }); const tool = (context: Context): Tool => ({ method: 'update_subscription', name: 'Update Subscription', description: updateSubscriptionPrompt(context), parameters: updateSubscriptionParameters(context), annotations: updateSubscriptionAnnotations(), actions: { subscriptions: { update: true, }, }, execute: updateSubscription, }); export default tool; ``` -------------------------------------------------------------------------------- /python/stripe_agent_toolkit/prompts.py: -------------------------------------------------------------------------------- ```python CREATE_CUSTOMER_PROMPT = """ This tool will create a customer in Stripe. It takes two arguments: - name (str): The name of the customer. - email (str, optional): The email of the customer. """ LIST_CUSTOMERS_PROMPT = """ This tool will fetch a list of Customers from Stripe. It takes no input. """ CREATE_PRODUCT_PROMPT = """ This tool will create a product in Stripe. It takes two arguments: - name (str): The name of the product. - description (str, optional): The description of the product. """ LIST_PRODUCTS_PROMPT = """ This tool will fetch a list of Products from Stripe. It takes one optional argument: - limit (int, optional): The number of products to return. """ CREATE_PRICE_PROMPT = """ This tool will create a price in Stripe. If a product has not already been specified, a product should be created first. It takes three arguments: - product (str): The ID of the product to create the price for. - unit_amount (int): The unit amount of the price in cents. - currency (str): The currency of the price. """ LIST_PRICES_PROMPT = """ This tool will fetch a list of Prices from Stripe. It takes two arguments: - product (str, optional): The ID of the product to list prices for. - limit (int, optional): The number of prices to return. """ CREATE_PAYMENT_LINK_PROMPT = """ This tool will create a payment link in Stripe. It takes two arguments: - price (str): The ID of the price to create the payment link for. - quantity (int): The quantity of the product to include in the payment link. - redirect_url (string, optional): The URL the customer will be redirected to after the purchase is complete. """ LIST_INVOICES_PROMPT = """ This tool will list invoices in Stripe. It takes two arguments: - customer (str, optional): The ID of the customer to list the invoices for. - limit (int, optional): The number of prices to return. """ CREATE_INVOICE_PROMPT = """ This tool will create an invoice in Stripe. It takes one argument: - customer (str): The ID of the customer to create the invoice for. """ CREATE_INVOICE_ITEM_PROMPT = """ This tool will create an invoice item in Stripe. It takes two arguments: - customer (str): The ID of the customer to create the invoice item for. - price (str): The ID of the price to create the invoice item for. - invoice (str): The ID of the invoice to create the invoice item for. """ FINALIZE_INVOICE_PROMPT = """ This tool will finalize an invoice in Stripe. It takes one argument: - invoice (str): The ID of the invoice to finalize. """ RETRIEVE_BALANCE_PROMPT = """ This tool will retrieve the balance from Stripe. It takes no input. """ CREATE_REFUND_PROMPT = """ This tool will refund a payment intent in Stripe. It takes three arguments: - payment_intent (str): The ID of the payment intent to refund. - amount (int, optional): The amount to refund in cents. - reason (str, optional): The reason for the refund. """ LIST_PAYMENT_INTENTS_PROMPT = """ This tool will list payment intents in Stripe. It takes two arguments: - customer (str, optional): The ID of the customer to list payment intents for. - limit (int, optional): The number of payment intents to return. """ CREATE_BILLING_PORTAL_SESSION_PROMPT = """ This tool will create a billing portal session. It takes two arguments: - customer (str): The ID of the customer to create the invoice item for. - return_url (str, optional): The default URL to return to afterwards. """ ``` -------------------------------------------------------------------------------- /typescript/examples/cloudflare/src/index.ts: -------------------------------------------------------------------------------- ```typescript import {McpServer} from '@modelcontextprotocol/sdk/server/mcp.js'; import {z} from 'zod'; import { PaymentState, experimental_PaidMcpAgent as PaidMcpAgent, } from '@stripe/agent-toolkit/cloudflare'; import {generateImage} from './imageGenerator'; import {OAuthProvider} from '@cloudflare/workers-oauth-provider'; import app from './app'; type Bindings = Env; type Props = { userEmail: string; }; type State = PaymentState & {}; export class MyMCP extends PaidMcpAgent<Bindings, State, Props> { server = new McpServer({ name: 'Demo', version: '1.0.0', }); initialState: State = {}; async init() { this.server.tool('add', {a: z.number(), b: z.number()}, ({a, b}) => { return { content: [{type: 'text', text: `Result: ${a + b}`}], }; }); // One-time payment, then the tool is usable forever this.paidTool( 'buy_premium', 'Buy a premium account', {}, () => { return { content: [{type: 'text', text: `You now have a premium account!`}], }; }, { checkout: { success_url: 'http://localhost:4242/payment/success', line_items: [ { price: process.env.STRIPE_PRICE_ID_ONE_TIME_PAYMENT, quantity: 1, }, ], mode: 'payment', }, paymentReason: 'Open the checkout link in the browser to buy a premium account.', } ); // Subscription, then the tool is usable as long as the subscription is active this.paidTool( 'big_add', 'Add two numbers together', { a: z.number(), b: z.number(), }, ({a, b}) => { return { content: [{type: 'text', text: `Result: ${a + b}`}], }; }, { checkout: { success_url: 'http://localhost:4242/payment/success', line_items: [ { price: process.env.STRIPE_PRICE_ID_SUBSCRIPTION, quantity: 1, }, ], mode: 'subscription', }, paymentReason: 'You must pay a subscription to add two big numbers together.', } ); // Usage-based metered payments (Each tool call requires a payment) this.paidTool( 'generate_emoji', 'Generate an emoji given a single word (the `object` parameter describing the emoji)', { object: z.string().describe('one word'), }, ({object}) => { return { content: [{type: 'text', text: generateImage(object)}], }; }, { checkout: { success_url: 'http://localhost:4242/payment/success', line_items: [ { price: process.env.STRIPE_PRICE_ID_USAGE_BASED_SUBSCRIPTION, }, ], mode: 'subscription', }, meterEvent: 'image_generation', paymentReason: 'You get 3 free generations, then we charge 10 cents per generation.', } ); } } // Export the OAuth handler as the default export default new OAuthProvider({ apiRoute: '/sse', apiHandlers: { // @ts-ignore '/sse': MyMCP.serveSSE('/sse'), // @ts-ignore '/mcp': MyMCP.serve('/mcp'), }, // @ts-ignore defaultHandler: app, authorizeEndpoint: '/authorize', tokenEndpoint: '/token', clientRegistrationEndpoint: '/register', }); ``` -------------------------------------------------------------------------------- /python/stripe_agent_toolkit/strands/hooks.py: -------------------------------------------------------------------------------- ```python from typing import Any, Optional, Dict from ..api import StripeAPI class BillingHooks: """Billing hooks for Strands framework to track usage and create meter events.""" def __init__( self, stripe: StripeAPI, type: str, customer: str, meter: Optional[str] = None, meters: Optional[Dict[str, str]] = None ): """ Initialize billing hooks. Args: stripe: StripeAPI instance type: Type of billing - "outcome" or "token" customer: Customer ID for billing meter: Single meter ID for outcome-based billing meters: Dictionary of meter IDs for token-based billing (input/output) """ self.type = type self.stripe = stripe self.customer = customer self.meter = meter self.meters = meters or {} def on_start(self, context: Any = None) -> None: """Called when agent execution starts.""" pass def on_end(self, context: Any = None, output: Any = None, usage: Any = None) -> None: """ Called when agent execution ends. Args: context: Execution context (may contain usage information) output: Agent output usage: Usage information (tokens, etc.) """ if self.type == "outcome": # Create a single meter event for outcome-based billing if self.meter: self.stripe.create_meter_event(self.meter, self.customer) elif self.type == "token": # Create meter events for token-based billing if usage: # Try to extract token usage from different possible formats input_tokens = self._extract_input_tokens(usage, context) output_tokens = self._extract_output_tokens(usage, context) if input_tokens and self.meters.get("input"): self.stripe.create_meter_event( self.meters["input"], self.customer, str(input_tokens) ) if output_tokens and self.meters.get("output"): self.stripe.create_meter_event( self.meters["output"], self.customer, str(output_tokens) ) def on_error(self, context: Any = None, error: Exception = None) -> None: """Called when agent execution encounters an error.""" pass def _extract_input_tokens(self, usage: Any, context: Any = None) -> Optional[int]: """Extract input token count from usage information.""" if hasattr(usage, 'input_tokens'): return usage.input_tokens elif isinstance(usage, dict): return usage.get('input_tokens') or usage.get('prompt_tokens') elif context and hasattr(context, 'usage') and hasattr(context.usage, 'input_tokens'): return context.usage.input_tokens return None def _extract_output_tokens(self, usage: Any, context: Any = None) -> Optional[int]: """Extract output token count from usage information.""" if hasattr(usage, 'output_tokens'): return usage.output_tokens elif isinstance(usage, dict): return usage.get('output_tokens') or usage.get('completion_tokens') elif context and hasattr(context, 'usage') and hasattr(context.usage, 'output_tokens'): return context.usage.output_tokens return None ``` -------------------------------------------------------------------------------- /python/stripe_agent_toolkit/api.py: -------------------------------------------------------------------------------- ```python """Util that calls Stripe.""" from __future__ import annotations import json import stripe from typing import Optional from pydantic import BaseModel from .configuration import Context from .functions import ( create_customer, list_customers, create_product, list_products, create_price, list_prices, create_payment_link, list_invoices, create_invoice, create_invoice_item, finalize_invoice, retrieve_balance, create_refund, list_payment_intents, create_billing_portal_session, ) class StripeAPI(BaseModel): """ "Wrapper for Stripe API""" _context: Context def __init__(self, secret_key: str, context: Optional[Context]): super().__init__() self._context = context if context is not None else Context() stripe.api_key = secret_key stripe.set_app_info( "stripe-agent-toolkit-python", version="0.6.1", url="https://github.com/stripe/agent-toolkit", ) def create_meter_event(self, event: str, customer: str, value: Optional[str] = None) -> str: meter_event_data: dict = { "event_name": event, "payload": { "stripe_customer_id": customer, }, } if value is not None: meter_event_data["payload"]["value"] = value if self._context.get("account") is not None: account = self._context.get("account") if account is not None: meter_event_data["stripe_account"] = account stripe.billing.MeterEvent.create(**meter_event_data) def run(self, method: str, *args, **kwargs) -> str: if method == "create_customer": return json.dumps(create_customer(self._context, *args, **kwargs)) elif method == "list_customers": return json.dumps(list_customers(self._context, *args, **kwargs)) elif method == "create_product": return json.dumps(create_product(self._context, *args, **kwargs)) elif method == "list_products": return json.dumps(list_products(self._context, *args, **kwargs)) elif method == "create_price": return json.dumps(create_price(self._context, *args, **kwargs)) elif method == "list_prices": return json.dumps(list_prices(self._context, *args, **kwargs)) elif method == "create_payment_link": return json.dumps( create_payment_link(self._context, *args, **kwargs) ) elif method == "list_invoices": return json.dumps(list_invoices(self._context, *args, **kwargs)) elif method == "create_invoice": return json.dumps(create_invoice(self._context, *args, **kwargs)) elif method == "create_invoice_item": return json.dumps( create_invoice_item(self._context, *args, **kwargs) ) elif method == "finalize_invoice": return json.dumps(finalize_invoice(self._context, *args, **kwargs)) elif method == "retrieve_balance": return json.dumps(retrieve_balance(self._context, *args, **kwargs)) elif method == "create_refund": return json.dumps(create_refund(self._context, *args, **kwargs)) elif method == "list_payment_intents": return json.dumps( list_payment_intents(self._context, *args, **kwargs) ) elif method == "create_billing_portal_session": return json.dumps( create_billing_portal_session(self._context, *args, **kwargs) ) else: raise ValueError("Invalid method " + method) ``` -------------------------------------------------------------------------------- /typescript/examples/cloudflare/src/app.ts: -------------------------------------------------------------------------------- ```typescript // From: https://github.com/cloudflare/ai/blob/main/demos/remote-mcp-server/src/app.ts import {Hono} from 'hono'; import { layout, homeContent, parseApproveFormBody, renderAuthorizationRejectedContent, renderAuthorizationApprovedContent, renderLoggedInAuthorizeScreen, renderLoggedOutAuthorizeScreen, renderPaymentSuccessContent, } from './utils'; import type {OAuthHelpers} from '@cloudflare/workers-oauth-provider'; export type Bindings = Env & { OAUTH_PROVIDER: OAuthHelpers; }; const app = new Hono<{ Bindings: Bindings; }>(); // Render a basic homepage placeholder to make sure the app is up app.get('/', async (c) => { const content = await homeContent(c.req.raw); return c.html(layout(content, 'MCP Remote Auth Demo - Home')); }); // Render an authorization page // If the user is logged in, we'll show a form to approve the appropriate scopes // If the user is not logged in, we'll show a form to both login and approve the scopes app.get('/authorize', async (c) => { // We don't have an actual auth system, so to demonstrate both paths, you can // hard-code whether the user is logged in or not. We'll default to true // const isLoggedIn = false; const isLoggedIn = true; const oauthReqInfo = await c.env.OAUTH_PROVIDER.parseAuthRequest(c.req.raw); const oauthScopes = [ { name: 'read_profile', description: 'Read your basic profile information', }, {name: 'read_data', description: 'Access your stored data'}, {name: 'write_data', description: 'Create and modify your data'}, ]; if (isLoggedIn) { const content = await renderLoggedInAuthorizeScreen( oauthScopes, oauthReqInfo ); return c.html(layout(content, 'MCP Remote Auth Demo - Authorization')); } const content = await renderLoggedOutAuthorizeScreen( oauthScopes, oauthReqInfo ); return c.html(layout(content, 'MCP Remote Auth Demo - Authorization')); }); app.get('/payment/success', async (c) => { return c.html( layout( await renderPaymentSuccessContent(), 'MCP Remote Auth Demo - Payment Success' ) ); }); // The /authorize page has a form that will POST to /approve // This endpoint is responsible for validating any login information and // then completing the authorization request with the OAUTH_PROVIDER app.post('/approve', async (c) => { const {action, oauthReqInfo, email, password} = await parseApproveFormBody( await c.req.parseBody() ); if (!oauthReqInfo) { return c.html('INVALID LOGIN', 401); } // If the user needs to both login and approve, we should validate the login first if (action === 'login_approve') { // We'll allow any values for email and password for this demo // but you could validate them here // Ex: // if (email !== "[email protected]" || password !== "password") { // biome-ignore lint/correctness/noConstantCondition: This is a demo if (false) { return c.html( layout( await renderAuthorizationRejectedContent('/'), 'MCP Remote Auth Demo - Authorization Status' ) ); } } // The user must be successfully logged in and have approved the scopes, so we // can complete the authorization request const {redirectTo} = await c.env.OAUTH_PROVIDER.completeAuthorization({ request: oauthReqInfo, userId: email, metadata: { label: 'Test User', }, scope: oauthReqInfo.scope, props: { // Here, you can send data to the MCP server userEmail: email, }, }); // Store the redirect URL per email in KV somewhere c.env.OAUTH_KV.put(email, redirectTo); return c.html( layout( await renderAuthorizationApprovedContent(redirectTo), 'MCP Remote Auth Demo - Authorization Status' ) ); }); export default app; ``` -------------------------------------------------------------------------------- /typescript/examples/cloudflare/src/oauth.ts: -------------------------------------------------------------------------------- ```typescript // From: https://github.com/cloudflare/ai/blob/main/demos/remote-mcp-server/src/app.ts import {Hono} from 'hono'; import { layout, homeContent, parseApproveFormBody, renderAuthorizationRejectedContent, renderAuthorizationApprovedContent, renderLoggedInAuthorizeScreen, renderLoggedOutAuthorizeScreen, renderPaymentSuccessContent, } from './utils'; import type {OAuthHelpers} from '@cloudflare/workers-oauth-provider'; export type Bindings = Env & { OAUTH_PROVIDER: OAuthHelpers; }; const app = new Hono<{ Bindings: Bindings; }>(); // Render a basic homepage placeholder to make sure the app is up app.get('/', async (c) => { const content = await homeContent(c.req.raw); return c.html(layout(content, 'MCP Remote Auth Demo - Home')); }); // Render an authorization page // If the user is logged in, we'll show a form to approve the appropriate scopes // If the user is not logged in, we'll show a form to both login and approve the scopes app.get('/authorize', async (c) => { // We don't have an actual auth system, so to demonstrate both paths, you can // hard-code whether the user is logged in or not. We'll default to true // const isLoggedIn = false; const isLoggedIn = true; const oauthReqInfo = await c.env.OAUTH_PROVIDER.parseAuthRequest(c.req.raw); const oauthScopes = [ { name: 'read_profile', description: 'Read your basic profile information', }, {name: 'read_data', description: 'Access your stored data'}, {name: 'write_data', description: 'Create and modify your data'}, ]; if (isLoggedIn) { const content = await renderLoggedInAuthorizeScreen( oauthScopes, oauthReqInfo ); return c.html(layout(content, 'MCP Remote Auth Demo - Authorization')); } const content = await renderLoggedOutAuthorizeScreen( oauthScopes, oauthReqInfo ); return c.html(layout(content, 'MCP Remote Auth Demo - Authorization')); }); app.get('/payment/success', async (c) => { return c.html( layout( await renderPaymentSuccessContent(), 'MCP Remote Auth Demo - Payment Success' ) ); }); // The /authorize page has a form that will POST to /approve // This endpoint is responsible for validating any login information and // then completing the authorization request with the OAUTH_PROVIDER app.post('/approve', async (c) => { const {action, oauthReqInfo, email, password} = await parseApproveFormBody( await c.req.parseBody() ); if (!oauthReqInfo) { return c.html('INVALID LOGIN', 401); } // If the user needs to both login and approve, we should validate the login first if (action === 'login_approve') { // We'll allow any values for email and password for this demo // but you could validate them here // Ex: // if (email !== "[email protected]" || password !== "password") { // biome-ignore lint/correctness/noConstantCondition: This is a demo if (false) { return c.html( layout( await renderAuthorizationRejectedContent('/'), 'MCP Remote Auth Demo - Authorization Status' ) ); } } // The user must be successfully logged in and have approved the scopes, so we // can complete the authorization request const {redirectTo} = await c.env.OAUTH_PROVIDER.completeAuthorization({ request: oauthReqInfo, userId: email, metadata: { label: 'Test User', }, scope: oauthReqInfo.scope, props: { // Here, you can send data to the MCP server userEmail: email, }, }); // Store the redirect URL per email in KV somewhere c.env.OAUTH_KV.put(email, redirectTo); return c.html( layout( await renderAuthorizationApprovedContent(redirectTo), 'MCP Remote Auth Demo - Authorization Status' ) ); }); export default app; ``` -------------------------------------------------------------------------------- /modelcontextprotocol/src/index.ts: -------------------------------------------------------------------------------- ```typescript #!/usr/bin/env node import {StripeAgentToolkit} from '@stripe/agent-toolkit/modelcontextprotocol'; import {StdioServerTransport} from '@modelcontextprotocol/sdk/server/stdio.js'; import {green, red, yellow} from 'colors'; type ToolkitConfig = { actions: { [product: string]: {[action: string]: boolean}; }; context?: { account?: string; mode: 'modelcontextprotocol'; }; }; type Options = { tools?: string[]; apiKey?: string; stripeAccount?: string; }; const ACCEPTED_ARGS = ['api-key', 'tools', 'stripe-account']; const ACCEPTED_TOOLS = [ 'coupons.create', 'coupons.read', 'customers.create', 'customers.read', 'products.create', 'products.read', 'prices.create', 'prices.read', 'paymentLinks.create', 'invoices.create', 'invoices.read', 'invoices.update', 'invoiceItems.create', 'balance.read', 'refunds.create', 'paymentIntents.read', 'subscriptions.read', 'subscriptions.update', 'disputes.read', 'disputes.update', 'documentation.read', ]; export function parseArgs(args: string[]): Options { const options: Options = {}; args.forEach((arg) => { if (arg.startsWith('--')) { const [key, value] = arg.slice(2).split('='); if (key == 'tools') { options.tools = value.split(','); } else if (key == 'api-key') { if (!value.startsWith('sk_') && !value.startsWith('rk_')) { throw new Error('API key must start with "sk_" or "rk_".'); } options.apiKey = value; } else if (key == 'stripe-account') { // Validate api-key format if (!value.startsWith('acct_')) { throw new Error('Stripe account must start with "acct_".'); } options.stripeAccount = value; } else { throw new Error( `Invalid argument: ${key}. Accepted arguments are: ${ACCEPTED_ARGS.join( ', ' )}` ); } } }); // Check if required tools arguments is present if (!options.tools) { throw new Error('The --tools arguments must be provided.'); } // Validate tools against accepted enum values options.tools.forEach((tool: string) => { if (tool == 'all') { return; } if (!ACCEPTED_TOOLS.includes(tool.trim())) { throw new Error( `Invalid tool: ${tool}. Accepted tools are: ${ACCEPTED_TOOLS.join( ', ' )}` ); } }); // Check if API key is either provided in args or set in environment variables const apiKey = options.apiKey || process.env.STRIPE_SECRET_KEY; if (!apiKey) { throw new Error( 'Stripe API key not provided. Please either pass it as an argument --api-key=$KEY or set the STRIPE_SECRET_KEY environment variable.' ); } options.apiKey = apiKey; return options; } function handleError(error: any) { console.error(red('\n🚨 Error initializing Stripe MCP server:\n')); console.error(yellow(` ${error.message}\n`)); } export async function main() { const options = parseArgs(process.argv.slice(2)); // Create the StripeAgentToolkit instance const selectedTools = options.tools!; const configuration: ToolkitConfig = {actions: {}}; if (selectedTools.includes('all')) { ACCEPTED_TOOLS.forEach((tool) => { const [product, action] = tool.split('.'); configuration.actions[product] = { ...configuration.actions[product], [action]: true, }; }); } else { selectedTools.forEach((tool: any) => { const [product, action] = tool.split('.'); configuration.actions[product] = {[action]: true}; }); } configuration.context = { mode: 'modelcontextprotocol', }; // Append stripe account to configuration if provided if (options.stripeAccount) { configuration.context.account = options.stripeAccount; } const server = new StripeAgentToolkit({ secretKey: options.apiKey!, configuration: configuration, }); const transport = new StdioServerTransport(); await server.connect(transport); // We use console.error instead of console.log since console.log will output to stdio, which will confuse the MCP server console.error(green('✅ Stripe MCP Server running on stdio')); } if (require.main === module) { main().catch((error) => { handleError(error); }); } ``` -------------------------------------------------------------------------------- /python/stripe_agent_toolkit/tools.py: -------------------------------------------------------------------------------- ```python from typing import Dict, List from .prompts import ( CREATE_CUSTOMER_PROMPT, LIST_CUSTOMERS_PROMPT, CREATE_PRODUCT_PROMPT, LIST_PRODUCTS_PROMPT, CREATE_PRICE_PROMPT, LIST_PRICES_PROMPT, CREATE_PAYMENT_LINK_PROMPT, LIST_INVOICES_PROMPT, CREATE_INVOICE_PROMPT, CREATE_INVOICE_ITEM_PROMPT, FINALIZE_INVOICE_PROMPT, RETRIEVE_BALANCE_PROMPT, CREATE_REFUND_PROMPT, LIST_PAYMENT_INTENTS_PROMPT, CREATE_BILLING_PORTAL_SESSION_PROMPT, ) from .schema import ( CreateCustomer, ListCustomers, CreateProduct, ListProducts, CreatePrice, ListPrices, CreatePaymentLink, ListInvoices, CreateInvoice, CreateInvoiceItem, FinalizeInvoice, RetrieveBalance, CreateRefund, ListPaymentIntents, CreateBillingPortalSession, ) tools: List[Dict] = [ { "method": "create_customer", "name": "Create Customer", "description": CREATE_CUSTOMER_PROMPT, "args_schema": CreateCustomer, "actions": { "customers": { "create": True, } }, }, { "method": "list_customers", "name": "List Customers", "description": LIST_CUSTOMERS_PROMPT, "args_schema": ListCustomers, "actions": { "customers": { "read": True, } }, }, { "method": "create_product", "name": "Create Product", "description": CREATE_PRODUCT_PROMPT, "args_schema": CreateProduct, "actions": { "products": { "create": True, } }, }, { "method": "list_products", "name": "List Products", "description": LIST_PRODUCTS_PROMPT, "args_schema": ListProducts, "actions": { "products": { "read": True, } }, }, { "method": "create_price", "name": "Create Price", "description": CREATE_PRICE_PROMPT, "args_schema": CreatePrice, "actions": { "prices": { "create": True, } }, }, { "method": "list_prices", "name": "List Prices", "description": LIST_PRICES_PROMPT, "args_schema": ListPrices, "actions": { "prices": { "read": True, } }, }, { "method": "create_payment_link", "name": "Create Payment Link", "description": CREATE_PAYMENT_LINK_PROMPT, "args_schema": CreatePaymentLink, "actions": { "payment_links": { "create": True, } }, }, { "method": "list_invoices", "name": "List Invoices", "description": LIST_INVOICES_PROMPT, "args_schema": ListInvoices, "actions": { "invoices": { "read": True, } }, }, { "method": "create_invoice", "name": "Create Invoice", "description": CREATE_INVOICE_PROMPT, "args_schema": CreateInvoice, "actions": { "invoices": { "create": True, } }, }, { "method": "create_invoice_item", "name": "Create Invoice Item", "description": CREATE_INVOICE_ITEM_PROMPT, "args_schema": CreateInvoiceItem, "actions": { "invoice_items": { "create": True, } }, }, { "method": "finalize_invoice", "name": "Finalize Invoice", "description": FINALIZE_INVOICE_PROMPT, "args_schema": FinalizeInvoice, "actions": { "invoices": { "update": True, } }, }, { "method": "retrieve_balance", "name": "Retrieve Balance", "description": RETRIEVE_BALANCE_PROMPT, "args_schema": RetrieveBalance, "actions": { "balance": { "read": True, } }, }, { "method": "create_refund", "name": "Create Refund", "description": CREATE_REFUND_PROMPT, "args_schema": CreateRefund, "actions": { "refunds": { "create": True, } }, }, { "method": "list_payment_intents", "name": "List Payment Intents", "description": LIST_PAYMENT_INTENTS_PROMPT, "args_schema": ListPaymentIntents, "actions": { "payment_intents": { "read": True, } }, }, { "method": "create_billing_portal_session", "name": "Create Billing Portal Session", "description": CREATE_BILLING_PORTAL_SESSION_PROMPT, "args_schema": CreateBillingPortalSession, "actions": { "billing_portal_sessions": { "create": True, } }, }, ] ``` -------------------------------------------------------------------------------- /typescript/src/test/shared/invoices/functions.test.ts: -------------------------------------------------------------------------------- ```typescript import {createInvoice} from '@/shared/invoices/createInvoice'; import {listInvoices} from '@/shared/invoices/listInvoices'; import {finalizeInvoice} from '@/shared/invoices/finalizeInvoice'; const Stripe = jest.fn().mockImplementation(() => ({ invoices: { create: jest.fn(), finalizeInvoice: jest.fn(), retrieve: jest.fn(), list: jest.fn(), }, })); let stripe: ReturnType<typeof Stripe>; beforeEach(() => { stripe = new Stripe('fake-api-key'); }); describe('createInvoice', () => { it('should create an invoice and return it', async () => { const params = { customer: 'cus_123456', days_until_due: 30, }; const mockInvoice = {id: 'in_123456', customer: 'cus_123456'}; const context = {}; stripe.invoices.create.mockResolvedValue(mockInvoice); const result = await createInvoice(stripe, context, params); expect(stripe.invoices.create).toHaveBeenCalledWith( {...params, collection_method: 'send_invoice'}, undefined ); expect(result).toEqual(mockInvoice); }); it('should specify the connected account if included in context', async () => { const params = { customer: 'cus_123456', days_until_due: 30, }; const mockInvoice = {id: 'in_123456', customer: 'cus_123456'}; const context = { account: 'acct_123456', }; stripe.invoices.create.mockResolvedValue(mockInvoice); const result = await createInvoice(stripe, context, params); expect(stripe.invoices.create).toHaveBeenCalledWith( { ...params, collection_method: 'send_invoice', }, {stripeAccount: context.account} ); expect(result).toEqual(mockInvoice); }); it('should create an invoice with a customer if included in context', async () => { const params = { days_until_due: 30, }; const mockInvoice = {id: 'in_123456', customer: 'cus_123456'}; const context = { customer: 'cus_123456', }; stripe.invoices.create.mockResolvedValue(mockInvoice); const result = await createInvoice(stripe, context, params); expect(stripe.invoices.create).toHaveBeenCalledWith( { ...params, customer: context.customer, collection_method: 'send_invoice', }, undefined ); expect(result).toEqual(mockInvoice); }); }); describe('listInvoices', () => { it('should list invoices and return them', async () => { const mockInvoices = [ {id: 'in_123456', customer: 'cus_123456'}, {id: 'in_789012', customer: 'cus_789012'}, ]; const context = {}; stripe.invoices.list.mockResolvedValue({data: mockInvoices}); const result = await listInvoices(stripe, context, {}); expect(stripe.invoices.list).toHaveBeenCalledWith({}, undefined); expect(result).toEqual(mockInvoices); }); it('should specify the connected account if included in context', async () => { const mockInvoices = [ {id: 'in_123456', customer: 'cus_123456'}, {id: 'in_789012', customer: 'cus_789012'}, ]; const context = { account: 'acct_123456', }; stripe.invoices.list.mockResolvedValue({data: mockInvoices}); const result = await listInvoices(stripe, context, {}); expect(stripe.invoices.list).toHaveBeenCalledWith( {}, {stripeAccount: context.account} ); expect(result).toEqual(mockInvoices); }); it('should list invoices for a specific customer', async () => { const mockInvoices = [ {id: 'in_123456', customer: 'cus_123456'}, {id: 'in_789012', customer: 'cus_789012'}, ]; const context = { customer: 'cus_123456', }; stripe.invoices.list.mockResolvedValue({data: mockInvoices}); const result = await listInvoices(stripe, context, {}); expect(stripe.invoices.list).toHaveBeenCalledWith( {customer: context.customer}, undefined ); expect(result).toEqual(mockInvoices); }); }); describe('finalizeInvoice', () => { it('should finalize an invoice and return it', async () => { const invoiceId = 'in_123456'; const mockInvoice = {id: invoiceId, customer: 'cus_123456'}; const context = {}; stripe.invoices.finalizeInvoice.mockResolvedValue(mockInvoice); const result = await finalizeInvoice(stripe, context, {invoice: invoiceId}); expect(stripe.invoices.finalizeInvoice).toHaveBeenCalledWith( invoiceId, undefined ); expect(result).toEqual(mockInvoice); }); it('should specify the connected account if included in context', async () => { const invoiceId = 'in_123456'; const mockInvoice = {id: invoiceId, customer: 'cus_123456'}; const context = { account: 'acct_123456', }; stripe.invoices.finalizeInvoice.mockResolvedValue(mockInvoice); const result = await finalizeInvoice(stripe, context, {invoice: invoiceId}); expect(stripe.invoices.finalizeInvoice).toHaveBeenCalledWith(invoiceId, { stripeAccount: context.account, }); expect(result).toEqual(mockInvoice); }); }); ``` -------------------------------------------------------------------------------- /python/stripe_agent_toolkit/schema.py: -------------------------------------------------------------------------------- ```python from typing import Optional from pydantic import BaseModel, Field class CreateCustomer(BaseModel): """Schema for the ``create_customer`` operation.""" name: str = Field( ..., description="The name of the customer.", ) email: Optional[str] = Field( None, description="The email of the customer.", ) class ListCustomers(BaseModel): """Schema for the ``list_customers`` operation.""" limit: Optional[int] = Field( None, description=( "A limit on the number of objects to be returned." " Limit can range between 1 and 100." ), ) email: Optional[str] = Field( None, description=( "A case-sensitive filter on the list based on" " the customer's email field. The value must be a string." ), ) class CreateProduct(BaseModel): """Schema for the ``create_product`` operation.""" name: str = Field( ..., description="The name of the product.", ) description: Optional[str] = Field( None, description="The description of the product.", ) class ListProducts(BaseModel): """Schema for the ``list_products`` operation.""" limit: Optional[int] = Field( None, description=( "A limit on the number of objects to be returned." " Limit can range between 1 and 100, and the default is 10." ), ) class CreatePrice(BaseModel): """Schema for the ``create_price`` operation.""" product: str = Field( ..., description="The ID of the product to create the price for." ) unit_amount: int = Field( ..., description="The unit amount of the price in cents.", ) currency: str = Field( ..., description="The currency of the price.", ) class ListPrices(BaseModel): """Schema for the ``list_prices`` operation.""" product: Optional[str] = Field( None, description="The ID of the product to list prices for.", ) limit: Optional[int] = Field( None, description=( "A limit on the number of objects to be returned." " Limit can range between 1 and 100, and the default is 10." ), ) class CreatePaymentLink(BaseModel): """Schema for the ``create_payment_link`` operation.""" price: str = Field( ..., description="The ID of the price to create the payment link for.", ) quantity: int = Field( ..., description="The quantity of the product to include.", ) redirect_url: Optional[str] = Field( None, description="The URL the customer will be redirected to after the purchase is complete.", ) class ListInvoices(BaseModel): """Schema for the ``list_invoices`` operation.""" customer: Optional[str] = Field( None, description="The ID of the customer to list invoices for.", ) limit: Optional[int] = Field( None, description=( "A limit on the number of objects to be returned." " Limit can range between 1 and 100, and the default is 10." ), ) class CreateInvoice(BaseModel): """Schema for the ``create_invoice`` operation.""" customer: str = Field( ..., description="The ID of the customer to create the invoice for." ) days_until_due: Optional[int] = Field( None, description="The number of days until the invoice is due.", ) class CreateInvoiceItem(BaseModel): """Schema for the ``create_invoice_item`` operation.""" customer: str = Field( ..., description="The ID of the customer to create the invoice item for.", ) price: str = Field( ..., description="The ID of the price for the item.", ) invoice: str = Field( ..., description="The ID of the invoice to create the item for.", ) class FinalizeInvoice(BaseModel): """Schema for the ``finalize_invoice`` operation.""" invoice: str = Field( ..., description="The ID of the invoice to finalize.", ) class RetrieveBalance(BaseModel): """Schema for the ``retrieve_balance`` operation.""" pass class CreateRefund(BaseModel): """Schema for the ``create_refund`` operation.""" payment_intent: str = Field( ..., description="The ID of the PaymentIntent to refund.", ) amount: Optional[int] = Field( ..., description="The amount to refund in cents.", ) class ListPaymentIntents(BaseModel): """Schema for the ``list_payment_intents`` operation.""" customer: Optional[str] = Field( None, description="The ID of the customer to list payment intents for.", ) limit: Optional[int] = Field( None, description=( "A limit on the number of objects to be returned." " Limit can range between 1 and 100." ), ) class CreateBillingPortalSession(BaseModel): """Schema for the ``create_billing_portal_session`` operation.""" customer: str = Field( None, description="The ID of the customer to create the billing portal session for.", ) return_url: Optional[str] = Field( None, description=( "The default URL to return to afterwards." ), ) ``` -------------------------------------------------------------------------------- /typescript/src/modelcontextprotocol/register-paid-tool.ts: -------------------------------------------------------------------------------- ```typescript import {z, type ZodRawShape} from 'zod'; import type {McpServer} from '@modelcontextprotocol/sdk/server/mcp.js'; import {ToolCallback} from '@modelcontextprotocol/sdk/server/mcp.js'; import type {CallToolResult} from '@modelcontextprotocol/sdk/types.js'; import Stripe from 'stripe'; /* * This supports one-time payment, subscription, usage-based metered payment. * For usage-based, set a `meterEvent` */ export type PaidToolOptions = { paymentReason: string; meterEvent?: string; stripeSecretKey: string; userEmail: string; checkout: Stripe.Checkout.SessionCreateParams; }; export async function registerPaidTool<Args extends ZodRawShape>( mcpServer: McpServer, toolName: string, toolDescription: string, paramsSchema: Args, // @ts-ignore: The typescript compiler complains this is an infinitely deep type paidCallback: ToolCallback<Args>, options: PaidToolOptions ) { const priceId = options.checkout.line_items?.find((li) => li.price)?.price; if (!priceId) { throw new Error( 'Price ID is required for a paid MCP tool. Learn more about prices: https://docs.stripe.com/products-prices/how-products-and-prices-work' ); } const stripe = new Stripe(options.stripeSecretKey, { appInfo: { name: 'stripe-agent-toolkit-mcp-payments', version: '0.7.11', url: 'https://github.com/stripe/agent-toolkit', }, }); const getCurrentCustomerID = async () => { const customers = await stripe.customers.list({ email: options.userEmail, }); let customerId: null | string = null; if (customers.data.length > 0) { customerId = customers.data.find((customer) => { return customer.email === options.userEmail; })?.id || null; } if (!customerId) { const customer = await stripe.customers.create({ email: options.userEmail, }); customerId = customer.id; } return customerId; }; const isToolPaidFor = async (toolName: string, customerId: string) => { // Check for paid checkout session for this tool (by metadata) const sessions = await stripe.checkout.sessions.list({ customer: customerId, limit: 100, }); const paidSession = sessions.data.find( (session) => session.metadata?.toolName === toolName && session.payment_status === 'paid' ); if (paidSession?.subscription) { // Check for active subscription for the priceId const subs = await stripe.subscriptions.list({ customer: customerId || '', status: 'active', }); const activeSub = subs.data.find((sub) => sub.items.data.find((item) => item.price.id === priceId) ); if (activeSub) { return true; } } if (paidSession) { return true; } return false; }; const createCheckoutSession = async ( paymentType: string, customerId: string ): Promise<CallToolResult | null> => { try { const session = await stripe.checkout.sessions.create({ ...options.checkout, metadata: { ...options.checkout.metadata, toolName, }, customer: customerId || undefined, }); const result = { status: 'payment_required', data: { paymentType, checkoutUrl: session.url, paymentReason: options.paymentReason, }, }; return { content: [ { type: 'text', text: JSON.stringify(result), } as {type: 'text'; text: string}, ], }; } catch (error: unknown) { let errMsg = 'Unknown error'; if (typeof error === 'object' && error !== null) { if ( 'raw' in error && typeof (error as {raw?: {message?: string}}).raw?.message === 'string' ) { errMsg = (error as {raw: {message: string}}).raw.message; } else if ( 'message' in error && typeof (error as {message?: string}).message === 'string' ) { errMsg = (error as {message: string}).message; } } console.error('Error creating stripe checkout session', errMsg); return { content: [ { type: 'text', text: JSON.stringify({ status: 'error', error: errMsg, }), } as {type: 'text'; text: string}, ], isError: true, }; } }; const recordUsage = async (customerId: string) => { if (!options.meterEvent) return; await stripe.billing.meterEvents.create({ event_name: options.meterEvent, payload: { stripe_customer_id: customerId, value: '1', }, }); }; // biome-ignore lint/suspicious/noExplicitAny: <explanation> const callback = async (args: any, extra: any): Promise<CallToolResult> => { const customerId = await getCurrentCustomerID(); const paidForTool = await isToolPaidFor(toolName, customerId); const paymentType = options.meterEvent ? 'usageBased' : 'oneTimeSubscription'; if (!paidForTool) { const checkoutResult = await createCheckoutSession( paymentType, customerId ); if (checkoutResult) return checkoutResult; } if (paymentType === 'usageBased') { await recordUsage(customerId); } // @ts-ignore: The typescript compiler complains this is an infinitely deep type return paidCallback(args, extra); }; // @ts-ignore: The typescript compiler complains this is an infinitely deep type mcpServer.tool(toolName, toolDescription, paramsSchema, callback as any); await Promise.resolve(); } ``` -------------------------------------------------------------------------------- /evals/scorer.ts: -------------------------------------------------------------------------------- ```typescript require("dotenv").config(); import { ClosedQA } from "autoevals"; import every from "lodash/every"; import braintrustOpenai from "./braintrust_openai"; import { EvalOutput } from "./eval"; import { ChatCompletionMessageParam } from "openai/resources/chat/completions.mjs"; import { ChatCompletionMessageToolCall } from "openai/resources/chat/completions.mjs"; import { Configuration as StripeAgentToolkitConfig } from "../typescript/src/shared/configuration"; import isEqual from "lodash/isEqual"; /* * EvalInput is what is passed into the agent. * It contains a userPrompt and configuration that can be * used to override the toolkit configuration. */ export type EvalInput = { toolkitConfigOverride: StripeAgentToolkitConfig; userPrompt: string; }; /* * EvalCaseFunction is the helper function that is used to * run assertions on the output of the agent. It does some * parsing of the raw completetion messages and tool calls * to make it easier to write assertions. */ export type EvalCaseFunction = ({ toolCalls, messages, assistantMessages, }: { toolCalls: ChatCompletionMessageToolCall[]; messages: ChatCompletionMessageParam[]; assistantMessages: string[]; }) => Array<AssertionResult | Promise<AssertionResult>>; export const AssertionScorer = async ({ output: responseMessages, expected: evalCaseFunction, }: { output: EvalOutput; expected: EvalCaseFunction; }) => { const toolCalls = responseMessages.flatMap((m) => { if ("tool_calls" in m && m.tool_calls) { return m.tool_calls; } else { return []; } }); const assistantMessages = responseMessages .filter((m) => m.role === "assistant") .map((m) => (typeof m.content === "string" ? m.content : "")); const rawResults = evalCaseFunction({ toolCalls, messages: responseMessages, assistantMessages, }); const assertionResults = await Promise.all(rawResults); const allPassed = every(assertionResults, (r) => r.status === "passed"); return { name: "Assertions Score", score: allPassed ? 1 : 0, metadata: { assertionResults, }, }; }; /* Below are assertion functions that can be used to evaluate the output of the agent. Similar to test framework helpers like Jest. */ export type AssertionResult = { status: "passed" | "failed"; assertion_type: string; expected?: string; actualValue?: string; message?: string; }; /** * Uses an LLM call to classify if a substring is semantically contained in a text. * @param text1 The full text you want to check against * @param text2 The string you want to check if it is contained in the text */ export async function semanticContains({ text1, text2, }: { text1: string; text2: string; }): Promise<AssertionResult> { const system = ` You are a highly intelligent AI that can determine if a piece of text semantically contains another piece of text. You will be given two pieces of text and you need to determine if the first piece of text semantically contains the second piece of text. Answer with just "yes" or "no". `; const completion = await braintrustOpenai.chat.completions.create({ model: "gpt-4o", messages: [ { role: "system", content: system }, { role: "user", content: `Text 1: ${text1}\n\nText 2: ${text2}\n\nDoes Text 1 semantically contain Text 2? Answer with just "yes" or "no".`, }, ], }); const response = completion.choices[0].message.content?.toLowerCase(); return { status: response === "yes" ? "passed" : "failed", assertion_type: "semantic_contains", expected: text2, actualValue: text1, }; } export const expectToolCall = ( actualToolCalls: ChatCompletionMessageToolCall[], expectedToolCalls: string[] ): AssertionResult => { const actualToolCallNames = actualToolCalls.map((tc) => tc.function.name); const pass = actualToolCallNames.some((tc) => expectedToolCalls.includes(tc)); return { status: pass ? "passed" : "failed", assertion_type: "expectToolCall", expected: expectedToolCalls.join(", "), actualValue: actualToolCallNames.join(", "), }; }; export const expectToolCallArgs = ( actualToolCalls: ChatCompletionMessageToolCall[], expectedArgs: Array<{ name: string; arguments: any; shallow?: boolean }> ): AssertionResult => { const actualToolCallNamesAndArgs = actualToolCalls.map((tc) => ({ name: tc.function.name, arguments: JSON.parse(tc.function.arguments), })); const pass = actualToolCallNamesAndArgs.some((tc) => { return expectedArgs.some((ea) => { if (ea.name !== tc.name) { return false; } if (ea.shallow === true) { return Object.keys(ea.arguments).every((key) => { return isEqual(ea.arguments[key], tc.arguments[key]); }); } else { return isEqual(ea.arguments, tc.arguments); } }); }); return { status: pass ? "passed" : "failed", assertion_type: "expectToolCallArgs", expected: expectedArgs .map((ea) => `${ea.name}: ${JSON.stringify(ea.arguments)}`) .join(", "), actualValue: actualToolCallNamesAndArgs .map((tc) => `${tc.name}: ${JSON.stringify(tc.arguments)}`) .join(", "), }; }; export const llmCriteriaMet = async ( messages: ChatCompletionMessageParam[], criteria: string ): Promise<AssertionResult> => { const assistantMessages = messages .filter((m) => m.role === "assistant") .map((m) => m.content) .join("\n"); const closedQA = await ClosedQA({ client: braintrustOpenai, input: "According to the provided criterion is the submission correct?", criteria, output: assistantMessages, }); const pass = !!closedQA.score && closedQA.score > 0.5; return { status: pass ? "passed" : "failed", assertion_type: "llm_criteria_met", expected: criteria, actualValue: assistantMessages, }; }; export const assert = ( condition: boolean, message: string ): AssertionResult => { return { status: condition ? "passed" : "failed", assertion_type: "plain_assert", message, }; }; ``` -------------------------------------------------------------------------------- /modelcontextprotocol/src/test/index.test.ts: -------------------------------------------------------------------------------- ```typescript import {main} from '../index'; import {parseArgs} from '../index'; import {StripeAgentToolkit} from '@stripe/agent-toolkit/modelcontextprotocol'; import {StdioServerTransport} from '@modelcontextprotocol/sdk/server/stdio.js'; describe('parseArgs function', () => { describe('success cases', () => { it('should parse api-key, tools and stripe-header arguments correctly', () => { const args = [ '--api-key=sk_test_123', '--tools=all', '--stripe-account=acct_123', ]; const options = parseArgs(args); expect(options.apiKey).toBe('sk_test_123'); expect(options.tools).toEqual(['all']); expect(options.stripeAccount).toBe('acct_123'); }); it('should parse api-key and tools arguments correctly', () => { const args = ['--api-key=sk_test_123', '--tools=all']; const options = parseArgs(args); expect(options.apiKey).toBe('sk_test_123'); expect(options.tools).toEqual(['all']); }); it('should parse restricted api key correctly', () => { const args = ['--api-key=rk_test_123', '--tools=all']; const options = parseArgs(args); expect(options.apiKey).toBe('rk_test_123'); expect(options.tools).toEqual(['all']); }); it('if api key set in env variable, should parse tools argument correctly', () => { process.env.STRIPE_SECRET_KEY = 'sk_test_123'; const args = ['--tools=all']; const options = parseArgs(args); expect(options.apiKey).toBe('sk_test_123'); expect(options.tools).toEqual(['all']); }); it('if api key set in env variable but also passed into args, should prefer args key', () => { process.env.STRIPE_SECRET_KEY = 'sk_test_123'; const args = ['--api-key=sk_test_456', '--tools=all']; const options = parseArgs(args); expect(options.apiKey).toBe('sk_test_456'); expect(options.tools).toEqual(['all']); delete process.env.STRIPE_SECRET_KEY; }); it('should parse tools argument correctly if a list of tools is provided', () => { const args = [ '--api-key=sk_test_123', '--tools=customers.create,products.read,documentation.read', ]; const options = parseArgs(args); expect(options.tools).toEqual([ 'customers.create', 'products.read', 'documentation.read', ]); expect(options.apiKey).toBe('sk_test_123'); }); it('ignore all arguments not prefixed with --', () => { const args = [ '--api-key=sk_test_123', '--tools=all', 'stripe-account=acct_123', ]; const options = parseArgs(args); expect(options.apiKey).toBe('sk_test_123'); expect(options.tools).toEqual(['all']); expect(options.stripeAccount).toBeUndefined(); }); }); describe('error cases', () => { it("should throw an error if api-key does not start with 'sk_' or 'rk_'", () => { const args = ['--api-key=test_123', '--tools=all']; expect(() => parseArgs(args)).toThrow( 'API key must start with "sk_" or "rk_".' ); }); it('should throw an error if api-key is not provided', () => { const args = ['--tools=all']; expect(() => parseArgs(args)).toThrow( 'Stripe API key not provided. Please either pass it as an argument --api-key=$KEY or set the STRIPE_SECRET_KEY environment variable.' ); }); it("should throw an error if stripe-account does not start with 'acct_'", () => { const args = [ '--api-key=sk_test_123', '--tools=all', '--stripe-account=test_123', ]; expect(() => parseArgs(args)).toThrow( 'Stripe account must start with "acct_".' ); }); it('should throw an error if tools argument is not provided', () => { const args = ['--api-key=sk_test_123']; expect(() => parseArgs(args)).toThrow( 'The --tools arguments must be provided.' ); }); it('should throw an error if an invalid argument is provided', () => { const args = [ '--invalid-arg=value', '--api-key=sk_test_123', '--tools=all', ]; expect(() => parseArgs(args)).toThrow( 'Invalid argument: invalid-arg. Accepted arguments are: api-key, tools, stripe-account' ); }); it('should throw an error if tools is not in accepted tool list', () => { const args = [ '--api-key=sk_test_123', '--tools=customers.create,products.read,fake.tool', ]; expect(() => parseArgs(args)).toThrow( /Invalid tool: fake\.tool\. Accepted tools are: .*$/ ); }); }); }); jest.mock('@stripe/agent-toolkit/modelcontextprotocol'); jest.mock('@modelcontextprotocol/sdk/server/stdio.js'); describe('main function', () => { beforeEach(() => { jest.clearAllMocks(); }); it('should initialize the server with tools=all correctly', async () => { process.argv = ['node', 'index.js', '--api-key=sk_test_123', '--tools=all']; await main(); expect(StripeAgentToolkit).toHaveBeenCalledWith({ secretKey: 'sk_test_123', configuration: { actions: ALL_ACTIONS, context: {mode: 'modelcontextprotocol'}, }, }); expect(StdioServerTransport).toHaveBeenCalled(); }); it('should initialize the server with specific list of tools correctly', async () => { process.argv = [ 'node', 'index.js', '--api-key=sk_test_123', '--tools=customers.create,products.read,documentation.read', ]; await main(); expect(StripeAgentToolkit).toHaveBeenCalledWith({ secretKey: 'sk_test_123', configuration: { actions: { customers: { create: true, }, products: { read: true, }, documentation: { read: true, }, }, context: { mode: 'modelcontextprotocol', }, }, }); expect(StdioServerTransport).toHaveBeenCalled(); }); it('should initialize the server with stripe header', async () => { process.argv = [ 'node', 'index.js', '--api-key=sk_test_123', '--tools=all', '--stripe-account=acct_123', ]; await main(); expect(StripeAgentToolkit).toHaveBeenCalledWith({ secretKey: 'sk_test_123', configuration: { actions: ALL_ACTIONS, context: {account: 'acct_123', mode: 'modelcontextprotocol'}, }, }); expect(StdioServerTransport).toHaveBeenCalled(); }); }); const ALL_ACTIONS = { customers: { create: true, read: true, }, coupons: { create: true, read: true, }, invoices: { create: true, update: true, read: true, }, invoiceItems: { create: true, }, paymentLinks: { create: true, }, products: { create: true, read: true, }, prices: { create: true, read: true, }, balance: { read: true, }, refunds: { create: true, }, subscriptions: { read: true, update: true, }, paymentIntents: { read: true, }, disputes: { read: true, update: true, }, documentation: { read: true, }, }; ``` -------------------------------------------------------------------------------- /typescript/src/test/shared/subscriptions/functions.test.ts: -------------------------------------------------------------------------------- ```typescript import {listSubscriptions} from '@/shared/subscriptions/listSubscriptions'; import {cancelSubscription} from '@/shared/subscriptions/cancelSubscription'; import {updateSubscription} from '@/shared/subscriptions/updateSubscription'; const Stripe = jest.fn().mockImplementation(() => ({ subscriptions: { list: jest.fn(), cancel: jest.fn(), update: jest.fn(), }, })); let stripe: ReturnType<typeof Stripe>; beforeEach(() => { stripe = new Stripe('fake-api-key'); }); describe('listSubscriptions', () => { it('should list subscriptions and return data', async () => { const mockSubscriptions = [ { id: 'sub_123456', customer: 'cus_123456', status: 'active', current_period_start: 1609459200, // 2021-01-01 current_period_end: 1612137600, // 2021-02-01 items: { data: [ { id: 'si_123', price: 'price_123', quantity: 1, }, ], }, }, { id: 'sub_789012', customer: 'cus_123456', status: 'canceled', current_period_start: 1609459200, // 2021-01-01 current_period_end: 1612137600, // 2021-02-01 items: { data: [ { id: 'si_456', price: 'price_456', quantity: 2, }, ], }, }, ]; const context = {}; const params = {}; stripe.subscriptions.list.mockResolvedValue({data: mockSubscriptions}); const result = await listSubscriptions(stripe, context, params); expect(stripe.subscriptions.list).toHaveBeenCalledWith(params, undefined); expect(result).toEqual(mockSubscriptions); }); it('should add customer from context if provided', async () => { const mockSubscriptions = [ { id: 'sub_123456', customer: 'cus_123456', status: 'active', current_period_start: 1609459200, current_period_end: 1612137600, items: { data: [ { id: 'si_123', price: 'price_123', quantity: 1, }, ], }, }, ]; const context = { customer: 'cus_123456', }; const params = {}; stripe.subscriptions.list.mockResolvedValue({data: mockSubscriptions}); const result = await listSubscriptions(stripe, context, params); expect(stripe.subscriptions.list).toHaveBeenCalledWith( {customer: 'cus_123456'}, undefined ); expect(result).toEqual(mockSubscriptions); }); it('should specify the connected account if included in context', async () => { const mockSubscriptions = [ { id: 'sub_123456', customer: 'cus_123456', status: 'active', current_period_start: 1609459200, current_period_end: 1612137600, items: { data: [ { id: 'si_123', price: 'price_123', quantity: 1, }, ], }, }, ]; const context = { account: 'acct_123456', }; const params = {}; stripe.subscriptions.list.mockResolvedValue({data: mockSubscriptions}); const result = await listSubscriptions(stripe, context, params); expect(stripe.subscriptions.list).toHaveBeenCalledWith(params, { stripeAccount: context.account, }); expect(result).toEqual(mockSubscriptions); }); it('should handle errors gracefully', async () => { const context = {}; const params = {}; stripe.subscriptions.list.mockRejectedValue(new Error('API Error')); const result = await listSubscriptions(stripe, context, params); expect(result).toBe('Failed to list subscriptions'); }); }); describe('cancelSubscription', () => { it('should cancel a subscription and return the result', async () => { const mockSubscription = { id: 'sub_123456', customer: 'cus_123456', status: 'active', current_period_start: 1609459200, current_period_end: 1612137600, items: { data: [ { id: 'si_123', price: 'price_123', quantity: 1, }, ], }, }; const context = {}; const params = { subscription: 'sub_123456', }; stripe.subscriptions.cancel.mockResolvedValue(mockSubscription); const result = await cancelSubscription(stripe, context, params); expect(stripe.subscriptions.cancel).toHaveBeenCalledWith( 'sub_123456', {}, undefined ); expect(result).toEqual(mockSubscription); }); it('should handle errors gracefully', async () => { const context = {}; const params = { subscription: 'sub_123456', }; stripe.subscriptions.cancel.mockRejectedValue(new Error('API Error')); const result = await cancelSubscription(stripe, context, params); expect(result).toBe('Failed to cancel subscription'); }); }); describe('updateSubscription', () => { it('should update a subscription and return the result', async () => { const mockSubscription = { id: 'sub_123456', customer: 'cus_123456', status: 'active', current_period_start: 1609459200, current_period_end: 1612137600, items: { data: [ { id: 'si_123', price: 'price_123', quantity: 1, }, ], }, }; const context = {}; const params = { subscription: 'sub_123456', items: [ { id: 'si_123', quantity: 2, }, ], }; stripe.subscriptions.update.mockResolvedValue(mockSubscription); const result = await updateSubscription(stripe, context, params); expect(stripe.subscriptions.update).toHaveBeenCalledWith( 'sub_123456', { items: [ { id: 'si_123', quantity: 2, }, ], }, undefined ); expect(result).toEqual(mockSubscription); }); it('should handle errors gracefully', async () => { const context = {}; const params = { subscription: 'sub_123456', items: [ { id: 'si_123', quantity: 2, }, ], }; stripe.subscriptions.update.mockRejectedValue(new Error('API Error')); const result = await updateSubscription(stripe, context, params); expect(result).toBe('Failed to update subscription'); }); it('should specify the connected account if included in context', async () => { const mockSubscription = { id: 'sub_123456', customer: 'cus_123456', status: 'active', current_period_start: 1609459200, current_period_end: 1612137600, items: { data: [ { id: 'si_123', price: 'price_123', quantity: 1, }, ], }, }; const context = { account: 'acct_123456', }; const params = { subscription: 'sub_123456', cancel_at_period_end: true, }; stripe.subscriptions.update.mockResolvedValue(mockSubscription); const result = await updateSubscription(stripe, context, params); expect(stripe.subscriptions.update).toHaveBeenCalledWith( 'sub_123456', { cancel_at_period_end: true, }, { stripeAccount: context.account, } ); expect(result).toEqual(mockSubscription); }); }); ``` -------------------------------------------------------------------------------- /typescript/src/test/modelcontextprotocol/register-paid-tool.test.ts: -------------------------------------------------------------------------------- ```typescript import {z} from 'zod'; import type {McpServer} from '@modelcontextprotocol/sdk/server/mcp.js'; import {registerPaidTool} from '../../modelcontextprotocol/register-paid-tool'; import Stripe from 'stripe'; import type { ServerNotification, ServerRequest, } from '@modelcontextprotocol/sdk/types.js'; import type {RequestHandlerExtra} from '@modelcontextprotocol/sdk/shared/protocol.js'; // Mock Stripe jest.mock('stripe'); const mockSecretKey = 'sk_test_123'; describe('registerPaidTool', () => { let mockMcpServer: jest.Mocked<McpServer>; let mockStripe: jest.Mocked<any>; let mockExtra: RequestHandlerExtra<ServerRequest, ServerNotification>; beforeEach(() => { // Reset all mocks jest.clearAllMocks(); // Mock McpServer mockMcpServer = { tool: jest.fn(), } as any; // Mock Stripe instance and methods mockStripe = { customers: { list: jest.fn(), create: jest.fn(), }, checkout: { sessions: { create: jest.fn(), retrieve: jest.fn(), list: jest.fn(), }, }, subscriptions: { list: jest.fn(), }, billing: { meterEvents: { create: jest.fn(), }, }, }; (Stripe as unknown as jest.Mock).mockImplementation(() => mockStripe); // Mock request handler extra mockExtra = { signal: new AbortController().signal, sendNotification: jest.fn(), sendRequest: jest.fn(), requestId: '123', }; }); it('should register a tool with the McpServer', async () => { const toolName = 'testTool'; const toolDescription = 'Test tool description'; const paramsSchema = { testParam: z.string(), }; const callback = jest.fn(); // @ts-ignore: https://github.com/modelcontextprotocol/typescript-sdk/issues/494 await registerPaidTool( mockMcpServer, toolName, toolDescription, paramsSchema, callback, { paymentReason: 'Test payment', stripeSecretKey: mockSecretKey, userEmail: '[email protected]', checkout: { success_url: 'https://example.com/success', line_items: [{price: 'price_123', quantity: 1}], mode: 'subscription', }, } ); expect(mockMcpServer.tool).toHaveBeenCalledWith( toolName, toolDescription, paramsSchema, expect.any(Function) ); }); it('should create a new customer if one does not exist', async () => { mockStripe.customers.list.mockResolvedValue({data: []}); mockStripe.customers.create.mockResolvedValue({id: 'cus_123'}); mockStripe.subscriptions.list.mockResolvedValue({ data: [ { items: { data: [ { price: { id: 'price_123', }, }, ], }, }, ], }); mockStripe.checkout.sessions.list.mockResolvedValue({data: []}); mockStripe.checkout.sessions.create.mockResolvedValue({ id: 'cs_123', url: 'https://checkout.stripe.com/123', }); const toolName = 'testTool'; const callback = jest.fn(); await registerPaidTool( mockMcpServer, toolName, 'Test description', {testParam: z.string()}, callback, { paymentReason: 'Test payment', stripeSecretKey: mockSecretKey, userEmail: '[email protected]', checkout: { success_url: 'https://example.com/success', line_items: [{price: 'price_123', quantity: 1}], mode: 'subscription', }, } ); const registeredCallback = mockMcpServer.tool.mock.calls[0]?.[3]; // @ts-ignore: TypeScript can't disambiguate between params schema and annotations await registeredCallback({testParam: 'test'}, mockExtra); expect(mockStripe.customers.list).toHaveBeenCalledWith({ email: '[email protected]', }); expect(mockStripe.customers.create).toHaveBeenCalledWith({ email: '[email protected]', }); }); it('should create a checkout session for unpaid tools', async () => { mockStripe.customers.list.mockResolvedValue({ data: [{id: 'cus_123', email: '[email protected]'}], }); mockStripe.checkout.sessions.create.mockResolvedValue({ id: 'cs_123', url: 'https://checkout.stripe.com/123', }); mockStripe.subscriptions.list.mockResolvedValue({ data: [], // No active subscriptions }); mockStripe.checkout.sessions.list.mockResolvedValue({ data: [], // No paid sessions }); const toolName = 'testTool'; const callback = jest.fn(); await registerPaidTool( mockMcpServer, toolName, 'Test description', {testParam: z.string()}, callback, { paymentReason: 'Test payment', stripeSecretKey: mockSecretKey, userEmail: '[email protected]', checkout: { success_url: 'https://example.com/success', line_items: [{price: 'price_123', quantity: 1}], mode: 'subscription', }, } ); const registeredCallback = mockMcpServer.tool.mock.calls[0]?.[3]; // @ts-ignore: TypeScript can't disambiguate between params schema and annotations const result = await registeredCallback({testParam: 'test'}, mockExtra); expect(mockStripe.checkout.sessions.create).toHaveBeenCalledWith({ success_url: 'https://example.com/success', line_items: [ { price: 'price_123', quantity: 1, }, ], mode: 'subscription', customer: 'cus_123', metadata: {toolName}, }); expect(result).toEqual({ content: [ { type: 'text', text: JSON.stringify({ status: 'payment_required', data: { paymentType: 'oneTimeSubscription', checkoutUrl: 'https://checkout.stripe.com/123', paymentReason: 'Test payment', }, }), }, ], }); expect(callback).not.toHaveBeenCalled(); }); it('should handle usage-based billing when meterEvent is provided', async () => { const toolName = 'testTool'; mockStripe.customers.list.mockResolvedValue({ data: [{id: 'cus_123', email: '[email protected]'}], }); mockStripe.checkout.sessions.list.mockResolvedValue({ data: [ { id: 'cs_123', metadata: {toolName}, payment_status: 'paid', subscription: 'sub_123', }, ], }); mockStripe.subscriptions.list.mockResolvedValue({ data: [ { items: { data: [ { price: { id: 'price_123', }, }, ], }, }, ], }); const callback = jest.fn().mockResolvedValue({ content: [{type: 'text', text: 'Success'}], }); await registerPaidTool( mockMcpServer, toolName, 'Test description', {testParam: z.string()}, callback, { paymentReason: 'Test payment', meterEvent: 'test.event', stripeSecretKey: mockSecretKey, userEmail: '[email protected]', checkout: { success_url: 'https://example.com/success', line_items: [{price: 'price_123'}], mode: 'subscription', }, } ); const registeredCallback = mockMcpServer.tool.mock.calls[0]?.[3]; // @ts-ignore: TypeScript can't disambiguate between params schema and annotations await registeredCallback({testParam: 'test'}, mockExtra); expect(mockStripe.billing.meterEvents.create).toHaveBeenCalledWith({ event_name: 'test.event', payload: { stripe_customer_id: 'cus_123', value: '1', }, }); }); }); ``` -------------------------------------------------------------------------------- /evals/cases.ts: -------------------------------------------------------------------------------- ```typescript require("dotenv").config(); import { assert, EvalCaseFunction, EvalInput, expectToolCall, expectToolCallArgs, llmCriteriaMet, } from "./scorer"; import { Configuration as StripeAgentToolkitConfig } from "../typescript/src/shared/configuration"; import Stripe from "stripe"; /* * A single test case that is used to evaluate the agent. * It contains an input, a toolkit config, and an function to use to run * assertions on the output of the agent. It is structured to be used with * Braintrust. */ type BraintrustTestCase = { input: EvalInput; toolkitConfig?: StripeAgentToolkitConfig; expected: EvalCaseFunction; }; /* This is used in a Braintrust Eval. Our test framework appends new test cases to this array.*/ const _testCases: Array<BraintrustTestCase | Promise<BraintrustTestCase>> = []; /* * Helper type for adding test cases to the Braintrust Eval. */ type TestCaseData = { // The user prompt to pass into the agent. prompt: string; // The function to use to run assertions on the output of the agent. fn: EvalCaseFunction; // Optional toolkit config to set into the agent to override the default set in eval.ts. toolkitConfig?: StripeAgentToolkitConfig; }; const argsToTestCase = (args: TestCaseData): BraintrustTestCase => ({ input: { toolkitConfigOverride: args.toolkitConfig || {}, userPrompt: args.prompt, }, expected: args.fn, }); /* * Helper function for adding test cases to the Braintrust Eval. */ const test = (args: TestCaseData | (() => Promise<TestCaseData>)) => { if (typeof args == "function") { const promise = args().then(argsToTestCase); _testCases.push(promise); } else { _testCases.push(argsToTestCase(args)); } }; test({ prompt: "Create a product called 'Test Product' with a description 'A test product for evaluation'", fn: ({ toolCalls, messages }) => [ expectToolCall(toolCalls, ["create_product"]), llmCriteriaMet( messages, "The message should include a successful production creation response" ), ], }); test({ prompt: "List all available products", fn: ({ toolCalls, messages }) => [ expectToolCall(toolCalls, ["list_products"]), llmCriteriaMet(messages, "The message should include a list of products"), ], }); test({ prompt: "Create a customer with a name of a Philadelphia Eagles player and email (you can make it up). Charge them $100.", fn: ({ toolCalls, messages }) => [ expectToolCall(toolCalls, ["create_customer"]), ], }); test({ prompt: "Create a payment link for a new product called 'test' with a price of $70. Come up with a haiku for the description.", fn: ({ toolCalls, messages }) => [ llmCriteriaMet(messages, "The message should include a payment link"), expectToolCall(toolCalls, ["create_payment_link"]), ], }); test({ prompt: "Create a payment link for a new product called 'test' with a price of $35.99, if the user completes the purchase they should be redirected to https://www.stripe.com", fn: ({ toolCalls, messages }) => [ expectToolCall(toolCalls, ["create_payment_link"]), expectToolCallArgs(toolCalls, [ { name: "create_payment_link", arguments: { redirect_url: "https://www.stripe.com", }, shallow: true, }, ]), llmCriteriaMet(messages, "The message should include a payment link and indicate the redirect url"), ], }); const stripe = new Stripe(process.env.STRIPE_SECRET_KEY!); test(async () => { const customer = await stripe.customers.create({ name: "Joel E", email: "[email protected]", }); const joelsPayment = await stripe.paymentIntents.create({ amount: 2000, currency: "usd", customer: customer.id, }); const otherPi = await stripe.paymentIntents.create({ amount: 3000, currency: "usd", }); return { prompt: "List payment intents", toolkitConfig: { context: { customer: customer.id, }, }, fn: ({ assistantMessages }) => [ assert( (function () { return ( assistantMessages.some((m) => m.includes(joelsPayment.id)) && assistantMessages.every((m) => !m.includes(otherPi.id)) ); })(), `messages only includes customers payment intent ${joelsPayment.id}` ), ], }; }); test({ prompt: "List all subscriptions", fn: ({ toolCalls, messages }) => [ llmCriteriaMet( messages, "The message should include a list of subscriptions" ), expectToolCall(toolCalls, ["list_subscriptions"]), ], }); test({ prompt: "Create a coupon called SUMMER25 that gives 25% off", fn: ({ toolCalls, messages }) => [ expectToolCall(toolCalls, ["create_coupon"]), llmCriteriaMet( messages, "The message should include a coupon creation response" ), ], }); test({ prompt: "Create a coupon called WINTERTEN that gives $10 off", fn: ({ toolCalls, messages }) => [ expectToolCall(toolCalls, ["create_coupon"]), expectToolCallArgs(toolCalls, [ { name: "create_coupon", arguments: { amount_off: 1000, currency: "USD", name: "WINTERTEN", } }, ]), llmCriteriaMet( messages, "The message should include a coupon creation response" ), ], }); test({ prompt: "List all coupons", fn: ({ toolCalls, messages }) => [ expectToolCall(toolCalls, ["list_coupons"]), ], }); test(async () => { const customer = await stripe.customers.create({ name: "Joel E", email: "[email protected]", payment_method: "pm_card_visa", }); const paymentMethod = await stripe.paymentMethods.create({ type: "card", card: { token: "tok_visa", }, }); await stripe.paymentMethods.attach(paymentMethod.id, { customer: customer.id, }); // // Set as default payment method await stripe.customers.update(customer.id, { invoice_settings: { default_payment_method: paymentMethod.id }, }); const product = await stripe.products.create({ name: "Subscription Product", description: "A test subscription product", }); const price = await stripe.prices.create({ product: product.id, unit_amount: 1000, currency: "usd", recurring: { interval: "month" }, }); await stripe.subscriptions.create({ customer: customer.id, items: [{ price: price.id }], }); return { prompt: `Cancel the users subscription`, toolkitConfig: { context: { customer: customer.id, }, }, fn: ({ toolCalls, messages }) => [ expectToolCall(toolCalls, ["list_subscriptions", "cancel_subscription"]), llmCriteriaMet( messages, "The message should include a successful subscription cancellation response" ), ], }; }); // New test for update subscription test(async () => { const customer = await stripe.customers.create({ name: "User Example", email: "[email protected]", }); const paymentMethod = await stripe.paymentMethods.create({ type: "card", card: { token: "tok_visa", }, }); await stripe.paymentMethods.attach(paymentMethod.id, { customer: customer.id, }); // // Set as default payment method await stripe.customers.update(customer.id, { invoice_settings: { default_payment_method: paymentMethod.id }, }); const product = await stripe.products.create({ name: "SaaS Product", description: "A test subscription product", }); const basicPrice = await stripe.prices.create({ product: product.id, unit_amount: 1000, currency: "usd", recurring: { interval: "month" }, }); const premiumPrice = await stripe.prices.create({ product: product.id, unit_amount: 2000, currency: "usd", recurring: { interval: "month" }, }); const subscription = await stripe.subscriptions.create({ customer: customer.id, items: [{ price: basicPrice.id, quantity: 1 }], }); return { prompt: `Upgrade the user's subscription to the premium plan`, toolkitConfig: { context: { customer: customer.id, }, }, fn: ({ toolCalls, messages }) => [ expectToolCall(toolCalls, ["list_subscriptions", "update_subscription"]), llmCriteriaMet( messages, "The message should include a successful subscription update response. The subscription should have been updated to the premium plan and have only one item." ), ], }; }); test({ prompt: "List all disputes", fn: ({ toolCalls, messages }) => [ expectToolCall(toolCalls, ["list_disputes"]), ], }); export const getEvalTestCases = async () => Promise.all(_testCases); ``` -------------------------------------------------------------------------------- /python/examples/openai/customer_support/emailer.py: -------------------------------------------------------------------------------- ```python # pyright: strict import imaplib import email import smtplib from email.mime.text import MIMEText from email.message import Message from email.mime.multipart import MIMEMultipart from email.utils import parseaddr from typing import List, Tuple, Callable, Union, Awaitable import asyncio import json import re from datetime import datetime from email.utils import parsedate_to_datetime class Email: def __init__( self, from_address: str, to_address: str, subject: str, body: str, id: str = "", date: datetime = datetime.now(), ): self.id = id self.to_address = to_address self.from_address = from_address self.subject = subject self.body = body self.date = date def to_message(self, reply_id: str, reply_to: str) -> MIMEMultipart: msg = MIMEMultipart() msg["From"] = self.from_address msg["To"] = self.to_address msg["Subject"] = self.subject msg["In-Reply-To"] = reply_id msg["References"] = reply_id msg["Reply-To"] = reply_to msg.attach(MIMEText(f"<html><body>{self.body}</body></html>", "html")) return msg def to_dict(self): return { "id": self.id, "to": self.to_address, "from": self.from_address, "subject": self.subject, "body": self.body, "date": self.date.strftime("%a, %d %b %Y %H:%M:%S %z"), } class Emailer: """ Emailer is an IMAP/SMTP client that can be used to fetch and respond to emails. It was mostly vibe-coded so please make improvements! TODO: add agent replies to the context """ def __init__( self, email_address: str, email_password: str, support_address: str = "", imap_server: str = "imap.gmail.com", imap_port: int = 993, smtp_server: str = "smtp.gmail.com", smtp_port: int = 587, ): # Email configuration self.email_address = email_address self.support_address = support_address if support_address else email_address self.email_password = email_password self.imap_server = imap_server self.imap_port = imap_port self.smtp_server = smtp_server self.smtp_port = smtp_port def _connect_to_email(self) -> Tuple[imaplib.IMAP4_SSL, smtplib.SMTP]: """Establish connections to email servers.""" # Connect to IMAP server imap_conn = imaplib.IMAP4_SSL(self.imap_server, self.imap_port) imap_conn.login(self.email_address, self.email_password) # Connect to SMTP server smtp_conn = smtplib.SMTP(self.smtp_server, self.smtp_port) smtp_conn.starttls() smtp_conn.login(self.email_address, self.email_password) return imap_conn, smtp_conn def _get_body(self, email_message: Message) -> str: body: str = "" if email_message.is_multipart(): for part in email_message.walk(): if part.get_content_type() == "text/plain": payload = part.get_payload(decode=True) if isinstance(payload, bytes): body = payload.decode() break else: payload = email_message.get_payload(decode=True) if isinstance(payload, bytes): body = payload.decode() else: body = str(payload) return self._strip_replies(body) def _strip_replies(self, raw_body: str) -> str: lines = raw_body.split("\n") pruned: List[str] = [] for line in lines: # Stop if we see a typical reply indicator if line.strip().startswith("On ") and " wrote:" in line: break pruned.append(line) return "\n".join(pruned).strip() def _parse_email( self, imap_conn: imaplib.IMAP4_SSL, email_id: bytes ) -> Union[Email, None]: _, msg_data = imap_conn.fetch(email_id.decode(), "(BODY.PEEK[])") if not msg_data or not msg_data[0]: return None msg_resp = msg_data[0] if isinstance(msg_resp, tuple) and len(msg_resp) == 2: email_body = msg_resp[1] else: return None email_message = email.message_from_bytes(email_body) subject = email_message["subject"] or "" from_address = parseaddr(email_message.get("From", ""))[1] to_address = parseaddr(email_message.get("To", ""))[1] date_str = email_message.get("Date", "") date = datetime.now() if date_str: try: date = parsedate_to_datetime(date_str) except Exception: pass body = self._get_body(email_message) return Email( id=email_id.decode(), from_address=from_address, to_address=to_address, subject=subject, body=body, date=date, ) def _get_email_thread( self, imap_conn: imaplib.IMAP4_SSL, email_id_bytes: bytes ) -> List[Email]: email = self._parse_email(imap_conn, email_id_bytes) if not email: return [] thread = [email] # Try thread via X-GM-THRID (Gmail extension) _, thrid_data = imap_conn.fetch(email.id, "(X-GM-THRID)") match = None if thrid_data and thrid_data[0]: data = thrid_data[0] if isinstance(data, bytes): match = re.search(r"X-GM-THRID\s+(\d+)", data.decode()) else: match = re.search(r"X-GM-THRID\s+(\d+)", str(data)) if match: thread_id = match.group(1) _, thread_ids = imap_conn.search(None, f"X-GM-THRID {thread_id}") if thread_ids and thread_ids[0]: thread = [ self._parse_email(imap_conn, mid) for mid in thread_ids[0].split() ] thread = [e for e in thread if e] thread.sort(key=lambda e: e.date) return thread # Fallback: use REFERENCES header _, ref_data = imap_conn.fetch( email.id, "(BODY.PEEK[HEADER.FIELDS (REFERENCES)])" ) if ref_data and ref_data[0]: ref_line = ( ref_data[0][1].decode() if isinstance(ref_data[0][1], bytes) else "" ) refs = re.findall(r"<([^>]+)>", ref_line) for ref in refs: _, ref_ids = imap_conn.search(None, f'(HEADER Message-ID "<{ref}>")') if ref_ids and ref_ids[0]: for ref_id in ref_ids[0].split(): ref_email = self._parse_email(imap_conn, ref_id) if ref_email and ref_email.id not in [e.id for e in thread]: thread.append(ref_email) # Sort emails in the thread by date (ascending order) thread.sort(key=lambda e: e.date) return thread return thread def _get_unread_emails(self, imap_conn: imaplib.IMAP4_SSL) -> List[List[Email]]: imap_conn.select("INBOX") _, msg_nums = imap_conn.search(None, f'(UNSEEN TO "{self.support_address}")') emails: List[List[Email]] = [] for email_id in msg_nums[0].split(): thread = self._get_email_thread(imap_conn, email_id) emails.append(thread) return emails def mark_as_read(self, imap_conn: imaplib.IMAP4_SSL, message_id: str): imap_conn.store(message_id, "+FLAGS", "\\Seen") def get_email_thread(self, email_id: str) -> List[Email]: # Connect to email servers imap_conn, smtp_conn = self._connect_to_email() imap_conn.select("INBOX") # Get the thread thread = self._get_email_thread( imap_conn=imap_conn, email_id_bytes=email_id.encode() ) # Close connections imap_conn.logout() smtp_conn.quit() return thread async def process( self, respond: Callable[[List[Email]], Awaitable[Union[Email, None]]], mark_read: bool = True, ): # Connect to email servers imap_conn, smtp_conn = self._connect_to_email() # Get unread emails print("Fetching unread emails...") unread_emails = self._get_unread_emails(imap_conn) for email_thread in unread_emails: # Get the most recent email in the thread most_recent = email_thread[-1] # Generate the response response = await respond(email_thread) # If there is no response, skip this email and keep as unread # in the inbox if response is None: continue # Send the response # Get the most recent email in the thread to reply to print( f"Replying to '{response.to_address}' with:\n {json.dumps(response.body)}" ) smtp_conn.send_message( response.to_message(most_recent.id, self.support_address) ) # Mark the original email as read if mark_read: self.mark_as_read(imap_conn, most_recent.id) # Close connections imap_conn.logout() smtp_conn.quit() async def run( self, respond: Callable[[List[Email]], Awaitable[Union[Email, None]]], mark_read: bool = True, delay: int = 60, ): while True: # Process emails await self.process(respond, mark_read) # Wait before next check print(f"Sleeping for {delay}s...") await asyncio.sleep(delay) ``` -------------------------------------------------------------------------------- /modelcontextprotocol/eslint.config.mjs: -------------------------------------------------------------------------------- ``` import prettier from "eslint-plugin-prettier"; import _import from "eslint-plugin-import"; import { fixupPluginRules } from "@eslint/compat"; import globals from "globals"; import typescriptEslint from "@typescript-eslint/eslint-plugin"; import path from "node:path"; import { fileURLToPath } from "node:url"; import js from "@eslint/js"; import { FlatCompat } from "@eslint/eslintrc"; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); const compat = new FlatCompat({ baseDirectory: __dirname, recommendedConfig: js.configs.recommended, allConfig: js.configs.all }); export default [...compat.extends("plugin:prettier/recommended"), { plugins: { prettier, import: fixupPluginRules(_import), }, languageOptions: { globals: { ...globals.node, }, ecmaVersion: 2018, sourceType: "commonjs", }, rules: { "accessor-pairs": "error", "array-bracket-spacing": ["error", "never"], "array-callback-return": "off", "arrow-parens": "error", "arrow-spacing": "error", "block-scoped-var": "off", "block-spacing": "off", "brace-style": ["error", "1tbs", { allowSingleLine: true, }], "capitalized-comments": "off", "class-methods-use-this": "off", "comma-dangle": "off", "comma-spacing": "off", "comma-style": ["error", "last"], complexity: "error", "computed-property-spacing": ["error", "never"], "consistent-return": "off", "consistent-this": "off", curly: "error", "default-case": "off", "dot-location": ["error", "property"], "dot-notation": "error", "eol-last": "error", eqeqeq: "off", "func-call-spacing": "error", "func-name-matching": "error", "func-names": "off", "func-style": ["error", "declaration", { allowArrowFunctions: true, }], "generator-star-spacing": "error", "global-require": "off", "guard-for-in": "error", "handle-callback-err": "off", "id-blacklist": "error", "id-length": "off", "id-match": "error", "import/extensions": "off", "init-declarations": "off", "jsx-quotes": "error", "key-spacing": "error", "keyword-spacing": ["error", { after: true, before: true, }], "line-comment-position": "off", "linebreak-style": ["error", "unix"], "lines-around-directive": "error", "max-depth": "error", "max-len": "off", "max-lines": "off", "max-nested-callbacks": "error", "max-params": "off", "max-statements": "off", "max-statements-per-line": "off", "multiline-ternary": "off", "new-cap": "off", "new-parens": "error", "newline-after-var": "off", "newline-before-return": "off", "newline-per-chained-call": "off", "no-alert": "error", "no-array-constructor": "error", "no-await-in-loop": "error", "no-bitwise": "off", "no-caller": "error", "no-catch-shadow": "off", "no-compare-neg-zero": "error", "no-confusing-arrow": "error", "no-continue": "off", "no-div-regex": "error", "no-duplicate-imports": "off", "no-else-return": "off", "no-empty-function": "off", "no-eq-null": "off", "no-eval": "error", "no-extend-native": "error", "no-extra-bind": "error", "no-extra-label": "error", "no-extra-parens": "off", "no-floating-decimal": "error", "no-implicit-globals": "error", "no-implied-eval": "error", "no-inline-comments": "off", "no-inner-declarations": ["error", "functions"], "no-invalid-this": "off", "no-iterator": "error", "no-label-var": "error", "no-labels": "error", "no-lone-blocks": "error", "no-lonely-if": "error", "no-loop-func": "error", "no-magic-numbers": "off", "no-mixed-requires": "error", "no-multi-assign": "off", "no-multi-spaces": "error", "no-multi-str": "error", "no-multiple-empty-lines": "error", "no-native-reassign": "error", "no-negated-condition": "off", "no-negated-in-lhs": "error", "no-nested-ternary": "error", "no-new": "error", "no-new-func": "error", "no-new-object": "error", "no-new-require": "error", "no-new-wrappers": "error", "no-octal-escape": "error", "no-param-reassign": "off", "no-path-concat": "error", "no-plusplus": ["error", { allowForLoopAfterthoughts: true, }], "no-process-env": "off", "no-process-exit": "error", "no-proto": "error", "no-prototype-builtins": "off", "no-restricted-globals": "error", "no-restricted-imports": "error", "no-restricted-modules": "error", "no-restricted-properties": "error", "no-restricted-syntax": "error", "no-return-assign": "error", "no-return-await": "error", "no-script-url": "error", "no-self-compare": "error", "no-sequences": "error", "no-shadow": "off", "no-shadow-restricted-names": "error", "no-spaced-func": "error", "no-sync": "error", "no-tabs": "error", "no-template-curly-in-string": "error", "no-ternary": "off", "no-throw-literal": "error", "no-trailing-spaces": "error", "no-undef-init": "error", "no-undefined": "off", "no-underscore-dangle": "off", "no-unmodified-loop-condition": "error", "no-unneeded-ternary": "error", "no-unused-expressions": "error", "no-unused-vars": ["error", { args: "none", }], "no-use-before-define": "off", "no-useless-call": "error", "no-useless-computed-key": "error", "no-useless-concat": "error", "no-useless-constructor": "error", "no-useless-escape": "off", "no-useless-rename": "error", "no-useless-return": "error", "no-var": "off", "no-void": "error", "no-warning-comments": "error", "no-whitespace-before-property": "error", "no-with": "error", "nonblock-statement-body-position": "error", "object-curly-newline": "off", "object-curly-spacing": ["error", "never"], "object-property-newline": "off", "object-shorthand": "off", "one-var": "off", "one-var-declaration-per-line": "error", "operator-assignment": ["error", "always"], "operator-linebreak": "off", "padded-blocks": "off", "prefer-arrow-callback": "off", "prefer-const": "error", "prefer-destructuring": ["error", { array: false, object: false, }], "prefer-numeric-literals": "error", "prefer-promise-reject-errors": "error", "prefer-reflect": "off", "prefer-rest-params": "off", "prefer-spread": "off", "prefer-template": "off", "quote-props": "off", quotes: ["error", "single", { avoidEscape: true, }], radix: "error", "require-await": "error", "require-jsdoc": "off", "rest-spread-spacing": "error", semi: "off", "semi-spacing": ["error", { after: true, before: false, }], "sort-imports": "off", "sort-keys": "off", "sort-vars": "error", "space-before-blocks": "error", "space-before-function-paren": "off", "space-in-parens": ["error", "never"], "space-infix-ops": "error", "space-unary-ops": "error", "spaced-comment": ["error", "always"], strict: "off", "symbol-description": "error", "template-curly-spacing": "error", "template-tag-spacing": "error", "unicode-bom": ["error", "never"], "valid-jsdoc": "off", "vars-on-top": "off", "wrap-regex": "off", "yield-star-spacing": "error", yoda: ["error", "never"], }, }, ...compat.extends( "eslint:recommended", "plugin:@typescript-eslint/eslint-recommended", "plugin:@typescript-eslint/recommended", "plugin:prettier/recommended", ).map(config => ({ ...config, files: ["**/*.ts"], })), { files: ["**/*.ts"], plugins: { "@typescript-eslint": typescriptEslint, prettier, }, rules: { "@typescript-eslint/no-use-before-define": 0, "@typescript-eslint/no-empty-interface": 0, "@typescript-eslint/no-unused-vars": 0, "@typescript-eslint/triple-slash-reference": 0, "@typescript-eslint/ban-ts-comment": "off", "@typescript-eslint/no-empty-function": 0, "@typescript-eslint/no-require-imports": 0, "@typescript-eslint/naming-convention": ["error", { selector: "default", format: ["camelCase", "UPPER_CASE", "PascalCase"], leadingUnderscore: "allow", }, { selector: "property", format: null, }], "@typescript-eslint/no-explicit-any": 0, "@typescript-eslint/explicit-function-return-type": "off", "@typescript-eslint/no-this-alias": "off", "@typescript-eslint/no-var-requires": 0, "prefer-rest-params": "off", }, }, { files: ["test/**/*.ts"], rules: { "@typescript-eslint/explicit-function-return-type": "off", }, }]; ``` -------------------------------------------------------------------------------- /typescript/eslint.config.mjs: -------------------------------------------------------------------------------- ``` import prettier from "eslint-plugin-prettier"; import _import from "eslint-plugin-import"; import { fixupPluginRules } from "@eslint/compat"; import globals from "globals"; import typescriptEslint from "@typescript-eslint/eslint-plugin"; import path from "node:path"; import { fileURLToPath } from "node:url"; import js from "@eslint/js"; import { FlatCompat } from "@eslint/eslintrc"; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); const compat = new FlatCompat({ baseDirectory: __dirname, recommendedConfig: js.configs.recommended, allConfig: js.configs.all }); export default [...compat.extends("plugin:prettier/recommended"), { plugins: { prettier, import: fixupPluginRules(_import), }, languageOptions: { globals: { ...globals.node, }, ecmaVersion: 2018, sourceType: "commonjs", }, rules: { "accessor-pairs": "error", "array-bracket-spacing": ["error", "never"], "array-callback-return": "off", "arrow-parens": "error", "arrow-spacing": "error", "block-scoped-var": "off", "block-spacing": "off", "brace-style": ["error", "1tbs", { allowSingleLine: true, }], "capitalized-comments": "off", "class-methods-use-this": "off", "comma-dangle": "off", "comma-spacing": "off", "comma-style": ["error", "last"], complexity: "error", "computed-property-spacing": ["error", "never"], "consistent-return": "off", "consistent-this": "off", curly: "error", "default-case": "off", "dot-location": ["error", "property"], "dot-notation": "error", "eol-last": "error", eqeqeq: "off", "func-call-spacing": "error", "func-name-matching": "error", "func-names": "off", "func-style": ["error", "declaration", { allowArrowFunctions: true, }], "generator-star-spacing": "error", "global-require": "off", "guard-for-in": "error", "handle-callback-err": "off", "id-blacklist": "error", "id-length": "off", "id-match": "error", "import/extensions": "off", "init-declarations": "off", "jsx-quotes": "error", "key-spacing": "error", "keyword-spacing": ["error", { after: true, before: true, }], "line-comment-position": "off", "linebreak-style": ["error", "unix"], "lines-around-directive": "error", "max-depth": "error", "max-len": "off", "max-lines": "off", "max-nested-callbacks": "error", "max-params": "off", "max-statements": "off", "max-statements-per-line": "off", "multiline-ternary": "off", "new-cap": "off", "new-parens": "error", "newline-after-var": "off", "newline-before-return": "off", "newline-per-chained-call": "off", "no-alert": "error", "no-array-constructor": "error", "no-await-in-loop": "error", "no-bitwise": "off", "no-caller": "error", "no-catch-shadow": "off", "no-compare-neg-zero": "error", "no-confusing-arrow": "error", "no-continue": "off", "no-div-regex": "error", "no-duplicate-imports": "off", "no-else-return": "off", "no-empty-function": "off", "no-eq-null": "off", "no-eval": "error", "no-extend-native": "error", "no-extra-bind": "error", "no-extra-label": "error", "no-extra-parens": "off", "no-floating-decimal": "error", "no-implicit-globals": "error", "no-implied-eval": "error", "no-inline-comments": "off", "no-inner-declarations": ["error", "functions"], "no-invalid-this": "off", "no-iterator": "error", "no-label-var": "error", "no-labels": "error", "no-lone-blocks": "error", "no-lonely-if": "error", "no-loop-func": "error", "no-magic-numbers": "off", "no-mixed-requires": "error", "no-multi-assign": "off", "no-multi-spaces": "error", "no-multi-str": "error", "no-multiple-empty-lines": "error", "no-native-reassign": "error", "no-negated-condition": "off", "no-negated-in-lhs": "error", "no-nested-ternary": "error", "no-new": "error", "no-new-func": "error", "no-new-object": "error", "no-new-require": "error", "no-new-wrappers": "error", "no-octal-escape": "error", "no-param-reassign": "off", "no-path-concat": "error", "no-plusplus": ["error", { allowForLoopAfterthoughts: true, }], "no-process-env": "off", "no-process-exit": "error", "no-proto": "error", "no-prototype-builtins": "off", "no-restricted-globals": "error", "no-restricted-imports": "error", "no-restricted-modules": "error", "no-restricted-properties": "error", "no-restricted-syntax": "error", "no-return-assign": "error", "no-return-await": "error", "no-script-url": "error", "no-self-compare": "error", "no-sequences": "error", "no-shadow": "off", "no-shadow-restricted-names": "error", "no-spaced-func": "error", "no-sync": "error", "no-tabs": "error", "no-template-curly-in-string": "error", "no-ternary": "off", "no-throw-literal": "error", "no-trailing-spaces": "error", "no-undef-init": "error", "no-undefined": "off", "no-underscore-dangle": "off", "no-unmodified-loop-condition": "error", "no-unneeded-ternary": "error", "no-unused-expressions": "error", "no-unused-vars": ["error", { args: "none", }], "no-use-before-define": "off", "no-useless-call": "error", "no-useless-computed-key": "error", "no-useless-concat": "error", "no-useless-constructor": "error", "no-useless-escape": "off", "no-useless-rename": "error", "no-useless-return": "error", "no-var": "off", "no-void": "error", "no-warning-comments": "error", "no-whitespace-before-property": "error", "no-with": "error", "nonblock-statement-body-position": "error", "object-curly-newline": "off", "object-curly-spacing": ["error", "never"], "object-property-newline": "off", "object-shorthand": "off", "one-var": "off", "one-var-declaration-per-line": "error", "operator-assignment": ["error", "always"], "operator-linebreak": "off", "padded-blocks": "off", "prefer-arrow-callback": "off", "prefer-const": "error", "prefer-destructuring": ["error", { array: false, object: false, }], "prefer-numeric-literals": "error", "prefer-promise-reject-errors": "error", "prefer-reflect": "off", "prefer-rest-params": "off", "prefer-spread": "off", "prefer-template": "off", "quote-props": "off", quotes: ["error", "single", { avoidEscape: true, }], radix: "error", "require-await": "error", "require-jsdoc": "off", "rest-spread-spacing": "error", semi: "off", "semi-spacing": ["error", { after: true, before: false, }], "sort-imports": "off", "sort-keys": "off", "sort-vars": "error", "space-before-blocks": "error", "space-before-function-paren": "off", "space-in-parens": ["error", "never"], "space-infix-ops": "error", "space-unary-ops": "error", "spaced-comment": ["error", "always"], strict: "off", "symbol-description": "error", "template-curly-spacing": "error", "template-tag-spacing": "error", "unicode-bom": ["error", "never"], "valid-jsdoc": "off", "vars-on-top": "off", "wrap-regex": "off", "yield-star-spacing": "error", yoda: ["error", "never"], }, }, ...compat.extends( "eslint:recommended", "plugin:@typescript-eslint/eslint-recommended", "plugin:@typescript-eslint/recommended", "plugin:prettier/recommended", ).map(config => ({ ...config, files: ["**/*.ts"], })), { files: ["**/*.ts"], plugins: { "@typescript-eslint": typescriptEslint, prettier, }, rules: { "@typescript-eslint/no-use-before-define": 0, "@typescript-eslint/no-empty-interface": 0, "@typescript-eslint/no-unused-vars": 0, "@typescript-eslint/triple-slash-reference": 0, "@typescript-eslint/ban-ts-comment": "off", "@typescript-eslint/no-empty-function": 0, "@typescript-eslint/no-require-imports": 0, "@typescript-eslint/naming-convention": ["error", { selector: "default", format: ["camelCase", "UPPER_CASE", "PascalCase"], leadingUnderscore: "allow", }, { selector: "property", format: null, }], "@typescript-eslint/no-explicit-any": 0, "@typescript-eslint/explicit-function-return-type": "off", "@typescript-eslint/no-this-alias": "off", "@typescript-eslint/no-var-requires": 0, "prefer-rest-params": "off", }, }, { files: ["test/**/*.ts"], rules: { "@typescript-eslint/explicit-function-return-type": "off", }, }, { files: ["examples/cloudflare/**/*.ts", "examples/cloudflare/**/*.js", "examples/cloudflare/**/*.mjs"], ignores: [], rules: { // Disable all rules for cloudflare examples ...Object.fromEntries( Object.keys(typescriptEslint.rules).map(rule => [`@typescript-eslint/${rule}`, "off"]) ), // Disable all base rules "no-unused-vars": "off", "no-undef": "off", "no-console": "off", "require-await": "off", "prettier/prettier": "off", "func-style": "off", "no-warning-comments": "off", "no-constant-condition": "off", // Add any other rules you want to disable } }]; ``` -------------------------------------------------------------------------------- /python/stripe_agent_toolkit/functions.py: -------------------------------------------------------------------------------- ```python import stripe from typing import Optional from .configuration import Context def create_customer(context: Context, name: str, email: Optional[str] = None): """ Create a customer. Parameters: name (str): The name of the customer. email (str, optional): The email address of the customer. Returns: stripe.Customer: The created customer. """ customer_data: dict = {"name": name} if email: customer_data["email"] = email if context.get("account") is not None: account = context.get("account") if account is not None: customer_data["stripe_account"] = account customer = stripe.Customer.create(**customer_data) return {"id": customer.id} def list_customers( context: Context, email: Optional[str] = None, limit: Optional[int] = None, ): """ List Customers. Parameters: email (str, optional): The email address of the customer. limit (int, optional): The number of customers to return. Returns: stripe.ListObject: A list of customers. """ customer_data: dict = {} if email: customer_data["email"] = email if limit: customer_data["limit"] = limit if context.get("account") is not None: account = context.get("account") if account is not None: customer_data["stripe_account"] = account customers = stripe.Customer.list(**customer_data) return [{"id": customer.id} for customer in customers.data] def create_product( context: Context, name: str, description: Optional[str] = None ): """ Create a product. Parameters: name (str): The name of the product. description (str, optional): The description of the product. Returns: stripe.Product: The created product. """ product_data: dict = {"name": name} if description: product_data["description"] = description if context.get("account") is not None: account = context.get("account") if account is not None: product_data["stripe_account"] = account return stripe.Product.create(**product_data) def list_products(context: Context, limit: Optional[int] = None): """ List Products. Parameters: limit (int, optional): The number of products to return. Returns: stripe.ListObject: A list of products. """ product_data: dict = {} if limit: product_data["limit"] = limit if context.get("account") is not None: account = context.get("account") if account is not None: product_data["stripe_account"] = account return stripe.Product.list(**product_data).data def create_price( context: Context, product: str, currency: str, unit_amount: int ): """ Create a price. Parameters: product (str): The ID of the product. currency (str): The currency of the price. unit_amount (int): The unit amount of the price. Returns: stripe.Price: The created price. """ price_data: dict = { "product": product, "currency": currency, "unit_amount": unit_amount, } if context.get("account") is not None: account = context.get("account") if account is not None: price_data["stripe_account"] = account return stripe.Price.create(**price_data) def list_prices( context: Context, product: Optional[str] = None, limit: Optional[int] = None, ): """ List Prices. Parameters: product (str, optional): The ID of the product to list prices for. limit (int, optional): The number of prices to return. Returns: stripe.ListObject: A list of prices. """ prices_data: dict = {} if product: prices_data["product"] = product if limit: prices_data["limit"] = limit if context.get("account") is not None: account = context.get("account") if account is not None: prices_data["stripe_account"] = account return stripe.Price.list(**prices_data).data def create_payment_link(context: Context, price: str, quantity: int, redirect_url: Optional[str] = None): """ Create a payment link. Parameters: price (str): The ID of the price. quantity (int): The quantity of the product. redirect_url (string, optional): The URL the customer will be redirected to after the purchase is complete. Returns: stripe.PaymentLink: The created payment link. """ payment_link_data: dict = { "line_items": [{"price": price, "quantity": quantity}], } if context.get("account") is not None: account = context.get("account") if account is not None: payment_link_data["stripe_account"] = account if redirect_url: payment_link_data["after_completion"] = {"type": "redirect", "redirect": {"url": redirect_url}} payment_link = stripe.PaymentLink.create(**payment_link_data) return {"id": payment_link.id, "url": payment_link.url} def list_invoices( context: Context, customer: Optional[str] = None, limit: Optional[int] = None, ): """ List invoices. Parameters: customer (str, optional): The ID of the customer. limit (int, optional): The number of invoices to return. Returns: stripe.ListObject: A list of invoices. """ invoice_data: dict = {} if customer: invoice_data["customer"] = customer if limit: invoice_data["limit"] = limit if context.get("account") is not None: account = context.get("account") if account is not None: invoice_data["stripe_account"] = account return stripe.Invoice.list(**invoice_data).data def create_invoice(context: Context, customer: str, days_until_due: int = 30): """ Create an invoice. Parameters: customer (str): The ID of the customer. days_until_due (int, optional): The number of days until the invoice is due. Returns: stripe.Invoice: The created invoice. """ invoice_data: dict = { "customer": customer, "collection_method": "send_invoice", "days_until_due": days_until_due, } if context.get("account") is not None: account = context.get("account") if account is not None: invoice_data["stripe_account"] = account invoice = stripe.Invoice.create(**invoice_data) return { "id": invoice.id, "hosted_invoice_url": invoice.hosted_invoice_url, "customer": invoice.customer, "status": invoice.status, } def create_invoice_item( context: Context, customer: str, price: str, invoice: str ): """ Create an invoice item. Parameters: customer (str): The ID of the customer. price (str): The ID of the price. invoice (str): The ID of the invoice. Returns: stripe.InvoiceItem: The created invoice item. """ invoice_item_data: dict = { "customer": customer, "price": price, "invoice": invoice, } if context.get("account") is not None: account = context.get("account") if account is not None: invoice_item_data["stripe_account"] = account invoice_item = stripe.InvoiceItem.create(**invoice_item_data) return {"id": invoice_item.id, "invoice": invoice_item.invoice} def finalize_invoice(context: Context, invoice: str): """ Finalize an invoice. Parameters: invoice (str): The ID of the invoice. Returns: stripe.Invoice: The finalized invoice. """ invoice_data: dict = {"invoice": invoice} if context.get("account") is not None: account = context.get("account") if account is not None: invoice_data["stripe_account"] = account invoice_object = stripe.Invoice.finalize_invoice(**invoice_data) return { "id": invoice_object.id, "hosted_invoice_url": invoice_object.hosted_invoice_url, "customer": invoice_object.customer, "status": invoice_object.status, } def retrieve_balance( context: Context, ): """ Retrieve the balance. Returns: stripe.Balance: The balance. """ balance_data: dict = {} if context.get("account") is not None: account = context.get("account") if account is not None: balance_data["stripe_account"] = account return stripe.Balance.retrieve(**balance_data) def create_refund( context: Context, payment_intent: str, amount: Optional[int] = None ): """ Create a refund. Parameters: payment_intent (str): The ID of the payment intent. amount (int, optional): The amount to refund in cents. Returns: stripe.Refund: The created refund. """ refund_data: dict = { "payment_intent": payment_intent, } if amount: refund_data["amount"] = amount if context.get("account") is not None: account = context.get("account") if account is not None: refund_data["stripe_account"] = account return stripe.Refund.create(**refund_data) def list_payment_intents(context: Context, customer: Optional[str] = None, limit: Optional[int] = None): """ List payment intents. Parameters: customer (str, optional): The ID of the customer to list payment intents for. limit (int, optional): The number of payment intents to return. Returns: stripe.ListObject: A list of payment intents. """ payment_intent_data: dict = {} if customer: payment_intent_data["customer"] = customer if limit: payment_intent_data["limit"] = limit if context.get("account") is not None: account = context.get("account") if account is not None: payment_intent_data["stripe_account"] = account return stripe.PaymentIntent.list(**payment_intent_data).data def create_billing_portal_session(context: Context, customer: str, return_url: Optional[str] = None): """ Creates a session of the customer portal. Parameters: customer (str): The ID of the customer to list payment intents for. return_url (str, optional): The URL to return to after the session is complete. Returns: stripe.BillingPortalSession: The created billing portal session. """ billing_portal_session_data: dict = { "customer": customer, } if return_url: billing_portal_session_data["return_url"] = return_url if context.get("account") is not None: account = context.get("account") if account is not None: billing_portal_session_data["stripe_account"] = account session_object = stripe.billing_portal.Session.create(**billing_portal_session_data) return { "id": session_object.id, "customer": session_object.customer, "url": session_object.url, } ``` -------------------------------------------------------------------------------- /typescript/examples/cloudflare/src/utils.ts: -------------------------------------------------------------------------------- ```typescript // From: https://github.com/cloudflare/ai/blob/main/demos/remote-mcp-server/src/utils.ts // Helper to generate the layout import {html, raw} from 'hono/html'; import type {HtmlEscapedString} from 'hono/utils/html'; import type {AuthRequest} from '@cloudflare/workers-oauth-provider'; // This file mainly exists as a dumping ground for uninteresting html and CSS // to remove clutter and noise from the auth logic. You likely do not need // anything from this file. export const layout = ( content: HtmlEscapedString | string, title: string ) => html` <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" /> <title>${title}</title> <script src="https://cdn.tailwindcss.com"></script> <script> tailwind.config = { theme: { extend: { colors: { primary: '#3498db', secondary: '#2ecc71', accent: '#f39c12', }, fontFamily: { sans: ['Inter', 'system-ui', 'sans-serif'], heading: ['Roboto', 'system-ui', 'sans-serif'], }, }, }, }; </script> <style> @import url('https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600;700&family=Roboto:wght@400;500;700&display=swap'); /* Custom styling for markdown content */ .markdown h1 { font-size: 2.25rem; font-weight: 700; font-family: 'Roboto', system-ui, sans-serif; color: #1a202c; margin-bottom: 1rem; line-height: 1.2; } .markdown h2 { font-size: 1.5rem; font-weight: 600; font-family: 'Roboto', system-ui, sans-serif; color: #2d3748; margin-top: 1.5rem; margin-bottom: 0.75rem; line-height: 1.3; } .markdown h3 { font-size: 1.25rem; font-weight: 600; font-family: 'Roboto', system-ui, sans-serif; color: #2d3748; margin-top: 1.25rem; margin-bottom: 0.5rem; } .markdown p { font-size: 1.125rem; color: #4a5568; margin-bottom: 1rem; line-height: 1.6; } .markdown a { color: #3498db; font-weight: 500; text-decoration: none; } .markdown a:hover { text-decoration: underline; } .markdown blockquote { border-left: 4px solid #f39c12; padding-left: 1rem; padding-top: 0.75rem; padding-bottom: 0.75rem; margin-top: 1.5rem; margin-bottom: 1.5rem; background-color: #fffbeb; font-style: italic; } .markdown blockquote p { margin-bottom: 0.25rem; } .markdown ul, .markdown ol { margin-top: 1rem; margin-bottom: 1rem; margin-left: 1.5rem; font-size: 1.125rem; color: #4a5568; } .markdown li { margin-bottom: 0.5rem; } .markdown ul li { list-style-type: disc; } .markdown ol li { list-style-type: decimal; } .markdown pre { background-color: #f7fafc; padding: 1rem; border-radius: 0.375rem; margin-top: 1rem; margin-bottom: 1rem; overflow-x: auto; } .markdown code { font-family: monospace; font-size: 0.875rem; background-color: #f7fafc; padding: 0.125rem 0.25rem; border-radius: 0.25rem; } .markdown pre code { background-color: transparent; padding: 0; } </style> </head> <body class="bg-gray-50 text-gray-800 font-sans leading-relaxed flex flex-col min-h-screen" > <header class="bg-white shadow-sm mb-8"> <div class="container mx-auto px-4 py-4 flex justify-between items-center" > <a href="/" class="text-xl font-heading font-bold text-primary hover:text-primary/80 transition-colors" >MCP Remote Auth Demo</a > </div> </header> <main class="container mx-auto px-4 pb-12 flex-grow">${content}</main> <footer class="bg-gray-100 py-6 mt-12"> <div class="container mx-auto px-4 text-center text-gray-600"> <p> © ${new Date().getFullYear()} MCP Remote Auth Demo. All rights reserved. </p> </div> </footer> </body> </html> `; export const homeContent = async (req: Request): Promise<HtmlEscapedString> => { return html` <div class="max-w-4xl mx-auto markdown">Example Paid MCP Server</div> `; }; export const renderLoggedInAuthorizeScreen = async ( oauthScopes: {name: string; description: string}[], oauthReqInfo: AuthRequest ) => { return html` <div class="max-w-md mx-auto bg-white p-8 rounded-lg shadow-md"> <h1 class="text-2xl font-heading font-bold mb-6 text-gray-900"> Authorization Request </h1> <div class="mb-8"> <h2 class="text-lg font-semibold mb-3 text-gray-800"> MCP Remote Auth Demo would like permission to: </h2> <ul class="space-y-2"> ${oauthScopes.map( (scope) => html` <li class="flex items-start"> <span class="inline-block mr-2 mt-1 text-secondary">✓</span> <div> <p class="font-medium">${scope.name}</p> <p class="text-gray-600 text-sm">${scope.description}</p> </div> </li> ` )} </ul> </div> <form action="/approve" method="POST" class="space-y-4"> <input type="hidden" name="oauthReqInfo" value="${JSON.stringify(oauthReqInfo)}" /> <input name="email" value="[email protected]" required placeholder="Enter email" class="w-full px-3 py-2 border border-gray-300 rounded-md focus:outline-none focus:ring-2 focus:ring-primary/50 focus:border-primary" /> <button type="submit" name="action" value="approve" class="w-full py-3 px-4 bg-secondary text-white rounded-md font-medium hover:bg-secondary/90 transition-colors" > Approve </button> <button type="submit" name="action" value="reject" class="w-full py-3 px-4 border border-gray-300 text-gray-700 rounded-md font-medium hover:bg-gray-50 transition-colors" > Reject </button> </form> </div> `; }; export const renderLoggedOutAuthorizeScreen = async ( oauthScopes: {name: string; description: string}[], oauthReqInfo: AuthRequest ) => { return html` <div class="max-w-md mx-auto bg-white p-8 rounded-lg shadow-md"> <h1 class="text-2xl font-heading font-bold mb-6 text-gray-900"> Authorization Request </h1> <div class="mb-8"> <h2 class="text-lg font-semibold mb-3 text-gray-800"> MCP Remote Auth Demo would like permission to: </h2> <ul class="space-y-2"> ${oauthScopes.map( (scope) => html` <li class="flex items-start"> <span class="inline-block mr-2 mt-1 text-secondary">✓</span> <div> <p class="font-medium">${scope.name}</p> <p class="text-gray-600 text-sm">${scope.description}</p> </div> </li> ` )} </ul> </div> <form action="/approve" method="POST" class="space-y-4"> <input type="hidden" name="oauthReqInfo" value="${JSON.stringify(oauthReqInfo)}" /> <div class="space-y-4"> <div> <label for="email" class="block text-sm font-medium text-gray-700 mb-1" >Email</label > <input type="email" id="email" name="email" required class="w-full px-3 py-2 border border-gray-300 rounded-md focus:outline-none focus:ring-2 focus:ring-primary/50 focus:border-primary" /> </div> <div> <label for="password" class="block text-sm font-medium text-gray-700 mb-1" >Password</label > <input type="password" id="password" name="password" required class="w-full px-3 py-2 border border-gray-300 rounded-md focus:outline-none focus:ring-2 focus:ring-primary/50 focus:border-primary" /> </div> </div> <button type="submit" name="action" value="login_approve" class="w-full py-3 px-4 bg-primary text-white rounded-md font-medium hover:bg-primary/90 transition-colors" > Log in and Approve </button> <button type="submit" name="action" value="reject" class="w-full py-3 px-4 border border-gray-300 text-gray-700 rounded-md font-medium hover:bg-gray-50 transition-colors" > Reject </button> </form> </div> `; }; export const renderApproveContent = async ( message: string, status: string, redirectUrl: string ) => { return html` <div class="max-w-md mx-auto bg-white p-8 rounded-lg shadow-md text-center"> <div class="mb-4"> <span class="inline-block p-3 ${status === 'success' ? 'bg-green-100 text-green-800' : 'bg-red-100 text-red-800'} rounded-full" > ${status === 'success' ? '✓' : '✗'} </span> </div> <h1 class="text-2xl font-heading font-bold mb-4 text-gray-900"> ${message} </h1> <p class="mb-8 text-gray-600"> You will be redirected back to the application shortly. </p> ${raw(` <script> setTimeout(() => { window.location.href = "${redirectUrl}"; }, 1000); </script> `)} </div> `; }; export const renderAuthorizationApprovedContent = async ( redirectUrl: string ) => { return renderApproveContent( 'Authorization approved!', 'success', redirectUrl ); }; export const renderAuthorizationRejectedContent = async ( redirectUrl: string ) => { return renderApproveContent('Authorization rejected.', 'error', redirectUrl); }; export const parseApproveFormBody = async (body: { [x: string]: string | File; }) => { const action = body.action as string; const email = body.email as string; const password = body.password as string; let oauthReqInfo: AuthRequest | null = null; try { oauthReqInfo = JSON.parse(body.oauthReqInfo as string) as AuthRequest; } catch (e) { oauthReqInfo = null; } return {action, oauthReqInfo, email, password}; }; export const renderPaymentSuccessContent = async (): Promise<HtmlEscapedString> => { return html` <div class="max-w-md mx-auto bg-white p-8 rounded-lg shadow-md text-center" > <h1 class="text-2xl font-heading font-bold mb-4 text-gray-900"> Payment Successful! </h1> <p class="mb-8 text-gray-600"> You can return to the MCP client now and rerun the tool. </p> </div> `; }; ``` -------------------------------------------------------------------------------- /python/tests/test_functions.py: -------------------------------------------------------------------------------- ```python import unittest import stripe from unittest import mock from stripe_agent_toolkit.functions import ( create_customer, list_customers, create_product, list_products, create_price, list_prices, create_payment_link, list_invoices, create_invoice, create_invoice_item, finalize_invoice, retrieve_balance, create_refund, list_payment_intents, create_billing_portal_session, ) class TestStripeFunctions(unittest.TestCase): def test_create_customer(self): with mock.patch("stripe.Customer.create") as mock_function: mock_customer = {"id": "cus_123"} mock_function.return_value = stripe.Customer.construct_from( mock_customer, "sk_test_123" ) result = create_customer( context={}, name="Test User", email="[email protected]" ) mock_function.assert_called_with( name="Test User", email="[email protected]" ) self.assertEqual(result, {"id": mock_customer["id"]}) def test_create_customer_with_context(self): with mock.patch("stripe.Customer.create") as mock_function: mock_customer = {"id": "cus_123"} mock_function.return_value = stripe.Customer.construct_from( mock_customer, "sk_test_123" ) result = create_customer( context={"account": "acct_123"}, name="Test User", email="[email protected]", ) mock_function.assert_called_with( name="Test User", email="[email protected]", stripe_account="acct_123", ) self.assertEqual(result, {"id": mock_customer["id"]}) def test_list_customers(self): with mock.patch("stripe.Customer.list") as mock_function: mock_customers = [{"id": "cus_123"}, {"id": "cus_456"}] mock_function.return_value = stripe.ListObject.construct_from( { "object": "list", "data": [ stripe.Customer.construct_from( { "id": "cus_123", "email": "[email protected]", "name": "Customer One", }, "sk_test_123", ), stripe.Customer.construct_from( { "id": "cus_456", "email": "[email protected]", "name": "Customer Two", }, "sk_test_123", ), ], "has_more": False, "url": "/v1/customers", }, "sk_test_123", ) result = list_customers(context={}) mock_function.assert_called_with() self.assertEqual(result, mock_customers) def test_list_customers_with_context(self): with mock.patch("stripe.Customer.list") as mock_function: mock_customers = [{"id": "cus_123"}, {"id": "cus_456"}] mock_function.return_value = stripe.ListObject.construct_from( { "object": "list", "data": [ stripe.Customer.construct_from( { "id": "cus_123", "email": "[email protected]", "name": "Customer One", }, "sk_test_123", ), stripe.Customer.construct_from( { "id": "cus_456", "email": "[email protected]", "name": "Customer Two", }, "sk_test_123", ), ], "has_more": False, "url": "/v1/customers", }, "sk_test_123", ) result = list_customers(context={"account": "acct_123"}) mock_function.assert_called_with( stripe_account="acct_123", ) self.assertEqual(result, mock_customers) def test_create_product(self): with mock.patch("stripe.Product.create") as mock_function: mock_product = {"id": "prod_123"} mock_function.return_value = stripe.Product.construct_from( mock_product, "sk_test_123" ) result = create_product(context={}, name="Test Product") mock_function.assert_called_with( name="Test Product", ) self.assertEqual(result, {"id": mock_product["id"]}) def test_create_product_with_context(self): with mock.patch("stripe.Product.create") as mock_function: mock_product = {"id": "prod_123"} mock_function.return_value = stripe.Product.construct_from( mock_product, "sk_test_123" ) result = create_product( context={"account": "acct_123"}, name="Test Product" ) mock_function.assert_called_with( name="Test Product", stripe_account="acct_123" ) self.assertEqual(result, {"id": mock_product["id"]}) def test_list_products(self): with mock.patch("stripe.Product.list") as mock_function: mock_products = [ {"id": "prod_123", "name": "Product One"}, {"id": "prod_456", "name": "Product Two"}, ] mock_function.return_value = stripe.ListObject.construct_from( { "object": "list", "data": [ stripe.Product.construct_from( { "id": "prod_123", "name": "Product One", }, "sk_test_123", ), stripe.Product.construct_from( { "id": "prod_456", "name": "Product Two", }, "sk_test_123", ), ], "has_more": False, "url": "/v1/products", }, "sk_test_123", ) result = list_products(context={}) mock_function.assert_called_with() self.assertEqual(result, mock_products) def test_create_price(self): with mock.patch("stripe.Price.create") as mock_function: mock_price = {"id": "price_123"} mock_function.return_value = stripe.Price.construct_from( mock_price, "sk_test_123" ) result = create_price( context={}, product="prod_123", currency="usd", unit_amount=1000, ) mock_function.assert_called_with( product="prod_123", currency="usd", unit_amount=1000, ) self.assertEqual(result, {"id": mock_price["id"]}) def test_create_price_with_context(self): with mock.patch("stripe.Price.create") as mock_function: mock_price = {"id": "price_123"} mock_function.return_value = stripe.Price.construct_from( mock_price, "sk_test_123" ) result = create_price( context={"account": "acct_123"}, product="prod_123", currency="usd", unit_amount=1000, ) mock_function.assert_called_with( product="prod_123", currency="usd", unit_amount=1000, stripe_account="acct_123", ) self.assertEqual(result, {"id": mock_price["id"]}) def test_list_prices(self): with mock.patch("stripe.Price.list") as mock_function: mock_prices = [ {"id": "price_123", "product": "prod_123"}, {"id": "price_456", "product": "prod_456"}, ] mock_function.return_value = stripe.ListObject.construct_from( { "object": "list", "data": [ stripe.Price.construct_from( { "id": "price_123", "product": "prod_123", }, "sk_test_123", ), stripe.Price.construct_from( { "id": "price_456", "product": "prod_456", }, "sk_test_123", ), ], "has_more": False, "url": "/v1/prices", }, "sk_test_123", ) result = list_prices({}) mock_function.assert_called_with() self.assertEqual(result, mock_prices) def test_list_prices_with_context(self): with mock.patch("stripe.Price.list") as mock_function: mock_prices = [ {"id": "price_123", "product": "prod_123"}, {"id": "price_456", "product": "prod_456"}, ] mock_function.return_value = stripe.ListObject.construct_from( { "object": "list", "data": [ stripe.Price.construct_from( { "id": "price_123", "product": "prod_123", }, "sk_test_123", ), stripe.Price.construct_from( { "id": "price_456", "product": "prod_456", }, "sk_test_123", ), ], "has_more": False, "url": "/v1/prices", }, "sk_test_123", ) result = list_prices({"account": "acct_123"}) mock_function.assert_called_with(stripe_account="acct_123") self.assertEqual(result, mock_prices) def test_create_payment_link(self): with mock.patch("stripe.PaymentLink.create") as mock_function: mock_payment_link = {"id": "pl_123", "url": "https://example.com"} mock_function.return_value = stripe.PaymentLink.construct_from( mock_payment_link, "sk_test_123" ) result = create_payment_link( context={}, price="price_123", quantity=1 ) mock_function.assert_called_with( line_items=[{"price": "price_123", "quantity": 1}], ) self.assertEqual(result, mock_payment_link) def test_create_payment_link_with_redirect_url(self): with mock.patch("stripe.PaymentLink.create") as mock_function: mock_payment_link = {"id": "pl_123", "url": "https://example.com"} mock_function.return_value = stripe.PaymentLink.construct_from( mock_payment_link, "sk_test_123" ) result = create_payment_link( context={}, price="price_123", quantity=1, redirect_url="https://example.com" ) mock_function.assert_called_with( line_items=[{"price": "price_123", "quantity": 1, }], after_completion={"type": "redirect", "redirect": {"url": "https://example.com"}} ) self.assertEqual(result, mock_payment_link) def test_create_payment_link_with_context(self): with mock.patch("stripe.PaymentLink.create") as mock_function: mock_payment_link = {"id": "pl_123", "url": "https://example.com"} mock_function.return_value = stripe.PaymentLink.construct_from( mock_payment_link, "sk_test_123" ) result = create_payment_link( context={"account": "acct_123"}, price="price_123", quantity=1 ) mock_function.assert_called_with( line_items=[{"price": "price_123", "quantity": 1}], stripe_account="acct_123", ) self.assertEqual(result, mock_payment_link) def test_list_invoices(self): with mock.patch("stripe.Invoice.list") as mock_function: mock_invoice = { "id": "in_123", "hosted_invoice_url": "https://example.com", "customer": "cus_123", "status": "open", } mock_invoices = { "object": "list", "data": [ stripe.Invoice.construct_from( mock_invoice, "sk_test_123", ), ], "has_more": False, "url": "/v1/invoices", } mock_function.return_value = stripe.Invoice.construct_from( mock_invoices, "sk_test_123" ) result = list_invoices(context={}) mock_function.assert_called_with() self.assertEqual( result, [ { "id": mock_invoice["id"], "hosted_invoice_url": mock_invoice["hosted_invoice_url"], "customer": mock_invoice["customer"], "status": mock_invoice["status"], } ], ) def test_list_invoices_with_customer(self): with mock.patch("stripe.Invoice.list") as mock_function: mock_invoice = { "id": "in_123", "hosted_invoice_url": "https://example.com", "customer": "cus_123", "status": "open", } mock_invoices = { "object": "list", "data": [ stripe.Invoice.construct_from( mock_invoice, "sk_test_123", ), ], "has_more": False, "url": "/v1/invoices", } mock_function.return_value = stripe.Invoice.construct_from( mock_invoices, "sk_test_123" ) result = list_invoices(context={}, customer="cus_123") mock_function.assert_called_with( customer="cus_123", ) self.assertEqual( result, [ { "id": mock_invoice["id"], "hosted_invoice_url": mock_invoice["hosted_invoice_url"], "customer": mock_invoice["customer"], "status": mock_invoice["status"], } ], ) def test_list_invoices_with_customer_and_limit(self): with mock.patch("stripe.Invoice.list") as mock_function: mock_invoice = { "id": "in_123", "hosted_invoice_url": "https://example.com", "customer": "cus_123", "status": "open", } mock_invoices = { "object": "list", "data": [ stripe.Invoice.construct_from( mock_invoice, "sk_test_123", ), ], "has_more": False, "url": "/v1/invoices", } mock_function.return_value = stripe.Invoice.construct_from( mock_invoices, "sk_test_123" ) result = list_invoices(context={}, customer="cus_123", limit=100) mock_function.assert_called_with( customer="cus_123", limit=100, ) self.assertEqual( result, [ { "id": mock_invoice["id"], "hosted_invoice_url": mock_invoice["hosted_invoice_url"], "customer": mock_invoice["customer"], "status": mock_invoice["status"], } ], ) def test_list_invoices_with_context(self): with mock.patch("stripe.Invoice.list") as mock_function: mock_invoice = { "id": "in_123", "hosted_invoice_url": "https://example.com", "customer": "cus_123", "status": "open", } mock_invoices = { "object": "list", "data": [ stripe.Invoice.construct_from( mock_invoice, "sk_test_123", ), ], "has_more": False, "url": "/v1/invoices", } mock_function.return_value = stripe.Invoice.construct_from( mock_invoices, "sk_test_123" ) result = list_invoices(context={"account": "acct_123"}, customer="cus_123") mock_function.assert_called_with( customer="cus_123", stripe_account="acct_123", ) self.assertEqual( result, [ { "id": mock_invoice["id"], "hosted_invoice_url": mock_invoice["hosted_invoice_url"], "customer": mock_invoice["customer"], "status": mock_invoice["status"], } ], ) def test_create_invoice(self): with mock.patch("stripe.Invoice.create") as mock_function: mock_invoice = { "id": "in_123", "hosted_invoice_url": "https://example.com", "customer": "cus_123", "status": "open", } mock_function.return_value = stripe.Invoice.construct_from( mock_invoice, "sk_test_123" ) result = create_invoice(context={}, customer="cus_123") mock_function.assert_called_with( customer="cus_123", collection_method="send_invoice", days_until_due=30, ) self.assertEqual( result, { "id": mock_invoice["id"], "hosted_invoice_url": mock_invoice["hosted_invoice_url"], "customer": mock_invoice["customer"], "status": mock_invoice["status"], }, ) def test_create_invoice_with_context(self): with mock.patch("stripe.Invoice.create") as mock_function: mock_invoice = { "id": "in_123", "hosted_invoice_url": "https://example.com", "customer": "cus_123", "status": "open", } mock_function.return_value = stripe.Invoice.construct_from( mock_invoice, "sk_test_123" ) result = create_invoice( context={"account": "acct_123"}, customer="cus_123" ) mock_function.assert_called_with( customer="cus_123", collection_method="send_invoice", days_until_due=30, stripe_account="acct_123", ) self.assertEqual( result, { "id": mock_invoice["id"], "hosted_invoice_url": mock_invoice["hosted_invoice_url"], "customer": mock_invoice["customer"], "status": mock_invoice["status"], }, ) def test_create_invoice_item(self): with mock.patch("stripe.InvoiceItem.create") as mock_function: mock_invoice_item = {"id": "ii_123", "invoice": "in_123"} mock_function.return_value = stripe.InvoiceItem.construct_from( mock_invoice_item, "sk_test_123" ) result = create_invoice_item( context={}, customer="cus_123", price="price_123", invoice="in_123", ) mock_function.assert_called_with( customer="cus_123", price="price_123", invoice="in_123" ) self.assertEqual( result, { "id": mock_invoice_item["id"], "invoice": mock_invoice_item["invoice"], }, ) def test_create_invoice_item_with_context(self): with mock.patch("stripe.InvoiceItem.create") as mock_function: mock_invoice_item = {"id": "ii_123", "invoice": "in_123"} mock_function.return_value = stripe.InvoiceItem.construct_from( mock_invoice_item, "sk_test_123" ) result = create_invoice_item( context={"account": "acct_123"}, customer="cus_123", price="price_123", invoice="in_123", ) mock_function.assert_called_with( customer="cus_123", price="price_123", invoice="in_123", stripe_account="acct_123", ) self.assertEqual( result, { "id": mock_invoice_item["id"], "invoice": mock_invoice_item["invoice"], }, ) def test_finalize_invoice(self): with mock.patch("stripe.Invoice.finalize_invoice") as mock_function: mock_invoice = { "id": "in_123", "hosted_invoice_url": "https://example.com", "customer": "cus_123", "status": "open", } mock_function.return_value = stripe.Invoice.construct_from( mock_invoice, "sk_test_123" ) result = finalize_invoice(context={}, invoice="in_123") mock_function.assert_called_with(invoice="in_123") self.assertEqual( result, { "id": mock_invoice["id"], "hosted_invoice_url": mock_invoice["hosted_invoice_url"], "customer": mock_invoice["customer"], "status": mock_invoice["status"], }, ) def test_finalize_invoice_with_context(self): with mock.patch("stripe.Invoice.finalize_invoice") as mock_function: mock_invoice = { "id": "in_123", "hosted_invoice_url": "https://example.com", "customer": "cus_123", "status": "open", } mock_function.return_value = stripe.Invoice.construct_from( mock_invoice, "sk_test_123" ) result = finalize_invoice( context={"account": "acct_123"}, invoice="in_123" ) mock_function.assert_called_with( invoice="in_123", stripe_account="acct_123" ) self.assertEqual( result, { "id": mock_invoice["id"], "hosted_invoice_url": mock_invoice["hosted_invoice_url"], "customer": mock_invoice["customer"], "status": mock_invoice["status"], }, ) def test_retrieve_balance(self): with mock.patch("stripe.Balance.retrieve") as mock_function: mock_balance = {"available": [{"amount": 1000, "currency": "usd"}]} mock_function.return_value = stripe.Balance.construct_from( mock_balance, "sk_test_123" ) result = retrieve_balance(context={}) mock_function.assert_called_with() self.assertEqual(result, mock_balance) def test_retrieve_balance_with_context(self): with mock.patch("stripe.Balance.retrieve") as mock_function: mock_balance = {"available": [{"amount": 1000, "currency": "usd"}]} mock_function.return_value = stripe.Balance.construct_from( mock_balance, "sk_test_123" ) result = retrieve_balance(context={"account": "acct_123"}) mock_function.assert_called_with(stripe_account="acct_123") self.assertEqual(result, mock_balance) def test_create_refund(self): with mock.patch("stripe.Refund.create") as mock_function: mock_refund = {"id": "re_123"} mock_function.return_value = stripe.Refund.construct_from( mock_refund, "sk_test_123" ) result = create_refund(context={}, payment_intent="pi_123") mock_function.assert_called_with(payment_intent="pi_123") self.assertEqual(result, {"id": mock_refund["id"]}) def test_create_partial_refund(self): with mock.patch("stripe.Refund.create") as mock_function: mock_refund = {"id": "re_123"} mock_function.return_value = stripe.Refund.construct_from( mock_refund, "sk_test_123" ) result = create_refund( context={}, payment_intent="pi_123", amount=1000 ) mock_function.assert_called_with( payment_intent="pi_123", amount=1000 ) self.assertEqual(result, {"id": mock_refund["id"]}) def test_create_refund_with_context(self): with mock.patch("stripe.Refund.create") as mock_function: mock_refund = {"id": "re_123"} mock_function.return_value = stripe.Refund.construct_from( mock_refund, "sk_test_123" ) result = create_refund( context={"account": "acct_123"}, payment_intent="pi_123", amount=1000, ) mock_function.assert_called_with( payment_intent="pi_123", amount=1000, stripe_account="acct_123" ) self.assertEqual(result, {"id": mock_refund["id"]}) def test_list_payment_intents(self): with mock.patch("stripe.PaymentIntent.list") as mock_function: mock_payment_intents = [{"id": "pi_123"}, {"id": "pi_456"}] mock_function.return_value = stripe.ListObject.construct_from( {"data": mock_payment_intents}, "sk_test_123" ) result = list_payment_intents(context={}) mock_function.assert_called_with() self.assertEqual(result, mock_payment_intents) def test_list_payment_intents_with_context(self): with mock.patch("stripe.PaymentIntent.list") as mock_function: mock_payment_intents = [{"id": "pi_123"}, {"id": "pi_456"}] mock_function.return_value = stripe.ListObject.construct_from( {"data": mock_payment_intents}, "sk_test_123" ) result = list_payment_intents(context={"account": "acct_123"}) mock_function.assert_called_with(stripe_account="acct_123") self.assertEqual(result, mock_payment_intents) def test_create_billing_portal_session(self): with mock.patch("stripe.billing_portal.Session.create") as mock_function: mock_billing_portal_session = { "id": "bps_123", "url": "https://example.com", "customer": "cus_123", "configuration": "bpc_123", } mock_function.return_value = stripe.billing_portal.Session.construct_from( mock_billing_portal_session, "sk_test_123" ) result = create_billing_portal_session(context={}, customer="cus_123") mock_function.assert_called_with(customer="cus_123") self.assertEqual(result, { "id": mock_billing_portal_session["id"], "url": mock_billing_portal_session["url"], "customer": mock_billing_portal_session["customer"], }) def test_create_billing_portal_session_with_return_url(self): with mock.patch("stripe.billing_portal.Session.create") as mock_function: mock_billing_portal_session = { "id": "bps_123", "url": "https://example.com", "customer": "cus_123", "configuration": "bpc_123", } mock_function.return_value = stripe.billing_portal.Session.construct_from( mock_billing_portal_session, "sk_test_123" ) result = create_billing_portal_session( context={}, customer="cus_123", return_url="http://example.com" ) mock_function.assert_called_with( customer="cus_123", return_url="http://example.com", ) self.assertEqual(result, { "id": mock_billing_portal_session["id"], "url": mock_billing_portal_session["url"], "customer": mock_billing_portal_session["customer"], }) def test_create_billing_portal_session_with_context(self): with mock.patch("stripe.billing_portal.Session.create") as mock_function: mock_billing_portal_session = { "id": "bps_123", "url": "https://example.com", "customer": "cus_123", "configuration": "bpc_123", } mock_function.return_value = stripe.billing_portal.Session.construct_from( mock_billing_portal_session, "sk_test_123" ) result = create_billing_portal_session( context={"account": "acct_123"}, customer="cus_123", return_url="http://example.com" ) mock_function.assert_called_with( customer="cus_123", return_url="http://example.com", stripe_account="acct_123" ) self.assertEqual(result, { "id": mock_billing_portal_session["id"], "url": mock_billing_portal_session["url"], "customer": mock_billing_portal_session["customer"], }) if __name__ == "__main__": unittest.main() ```