Upstash Workflow integrates with the Vercel AI SDK to provide durable and reliable AI applications. This allows you to:
Build resilient AI applications with automatic retries
Manage AI operations with workflow steps
Implement tools and function calling with durability
Handle errors gracefully across your AI operations
Handle long-running AI operations with extended timeouts
This guide will walk you through setting up and implementing AI features using Upstash Workflow’s durability guarantees with Vercel AI SDK’s capabilities.
AI SDKs (Vercel AI SDK, OpenAI SDK etc.) uses the client’s default fetch implementation to make API requests, but allows you to provide a custom fetch implementation.
In the case of Upstash Workflow, we need to use the context.call method to make HTTP requests. We can create a custom fetch implementation that uses context.call to make requests. By using context.call, Upstash Workflow is the one making the HTTP request and waiting for the response, even if it takes too long to receive response from the LLM.
The following code snippet can also be generalized to work with other LLM SDKs, such as Anthropic or Google.
import { createOpenAI } from '@ai-sdk/openai';import { HTTPMethods } from '@upstash/qstash';import { WorkflowAbort, WorkflowContext } from '@upstash/workflow';export const createWorkflowOpenAI = (context: WorkflowContext) => { return createOpenAI({ compatibility: "strict", fetch: async (input, init) => { try { // Prepare headers from init.headers const headers = init?.headers ? Object.fromEntries(new Headers(init.headers).entries()) : {}; // Prepare body from init.body const body = init?.body ? JSON.parse(init.body as string) : undefined; // Make network call const responseInfo = await context.call("openai-call-step", { url: input.toString(), method: init?.method as HTTPMethods, headers, body, }); // Construct headers for the response const responseHeaders = new Headers( Object.entries(responseInfo.header).reduce((acc, [key, values]) => { acc[key] = values.join(", "); return acc; }, {} as Record<string, string>) ); // Return the constructed response return new Response(JSON.stringify(responseInfo.body), { status: responseInfo.status, headers: responseHeaders, }); } catch (error) { if (error instanceof WorkflowAbort) { throw error } else { console.error("Error in fetch implementation:", error); throw error; // Rethrow error for further handling } } }, });};
Now that we’ve created the OpenAI client, we can use it to generate the text.
For that, we’re going to create a new workflow endpoint that uses the payload as prompt to generate text using the OpenAI client.
import { serve } from "@upstash/workflow/nextjs";import { WorkflowAbort } from '@upstash/workflow';import { generateText, ToolExecutionError } from 'ai';import { createWorkflowOpenAI } from './model';export const { POST } = serve<{ prompt: string }>(async (context) => { const openai = createWorkflowOpenAI(context); // Important: Must have a step before generateText const prompt = await context.run("get prompt", async () => { return context.requestPayload.prompt; }); try { const result = await generateText({ model: openai('gpt-3.5-turbo'), maxTokens: 2048, prompt, }); await context.run("text", () => { console.log(`TEXT: ${result.text}`); return result.text; }); } catch (error) { if (error instanceof ToolExecutionError && error.cause instanceof WorkflowAbort) { throw error.cause; } else { throw error; } }});
We can either run the app locally or deploy it. Once the app is running, we can trigger the workflow using the following code:
import { Client } from "@upstash/workflow";const client = new Client({ token: "<QSTASH_TOKEN>" });const { workflowRunId } = await client.trigger({ url: "https://<YOUR_WORKFLOW_ENDPOINT>/<YOUR-WORKFLOW-ROUTE>", body: { "prompt": "How is the weather in San Francisco around this time?" },});
Tools allow the AI model to perform specific actions during text generation. You can learn more about tools in the Vercel AI SDK documentation.
When using tools with Upstash Workflow, each tool execution must be wrapped in a workflow step.
The maxSteps parameter must be greater than 1 when using tools to allow the model to process tool results and generate final responses. See the tool steps documentation for detailed explanation.
import { z } from 'zod';import { serve } from "@upstash/workflow/nextjs";import { WorkflowAbort } from '@upstash/workflow';import { generateText, ToolExecutionError, tool } from 'ai';import { createWorkflowOpenAI } from './model';export const { POST } = serve<{ prompt: string }>(async (context) => { const openai = createWorkflowOpenAI(context); const prompt = await context.run("get prompt", async () => { return context.requestPayload.prompt; }); try { const result = await generateText({ model: openai('gpt-3.5-turbo'), tools: { weather: tool({ description: 'Get the weather in a location', parameters: z.object({ location: z.string().describe('The location to get the weather for'), }), execute: ({ location }) => context.run("weather tool", () => { // Mock data, replace with actual weather API call return { location, temperature: 72 + Math.floor(Math.random() * 21) - 10, }; }) }), }, maxSteps: 2, prompt, }); await context.run("text", () => { console.log(`TEXT: ${result.text}`); return result.text; }); } catch (error) { if (error instanceof ToolExecutionError && error.cause instanceof WorkflowAbort) { throw error.cause; } else { throw error; } }});
When called with the same prompt as above, we will see the following logs:
The most critical requirement is that generateText cannot be called before any workflow step. Always have a step before generateText. This could be a step which gets the prompt:
export const { POST } = serve<{ prompt: string }>(async (context) => { const openai = createWorkflowOpenAI(context); // Will throw "prompt is undefined" const result = await generateText({ model: openai('gpt-3.5-turbo'), prompt: context.requestPayload.prompt });});