Orchestration
Workflow Automation
- Durable workflows that survive crashes and restarts
- Multi-step orchestration with sessions, file operations, and process execution
- Error handling and retry via
ctx.step()for each operation - Agent chaining where output of one session feeds into the next
Basic workflow
Section titled “Basic workflow”Use the actor workflow() primitive to orchestrate a multi-step agent task. Each step is durable and will resume from where it left off after a restart.
Session creation and prompting must happen within the same step because sessions are ephemeral and won’t survive a replay.
import { agentOs } from "rivetkit/agent-os";import common from "@rivet-dev/agent-os-common";import pi from "@rivet-dev/agent-os-pi";import { actor, setup, workflow } from "rivetkit";
const automator = actor({ workflows: { fixBug: workflow<{ repo: string; issue: string }>(), }, run: async (c) => { for await (const message of c.workflow.iter("fixBug")) { const { repo, issue } = message.body; const agentHandle = c.actors.vm.getOrCreate([`fix-${issue}`]);
// Step 1: Clone the repo await c.step("clone-repo", async (c) => { return agentHandle.exec(`git clone ${repo} /home/user/repo`); });
// Step 2: Agent fixes the bug (session lives within this step) await c.step("fix-bug", async (c) => { const session = await agentHandle.createSession("pi", { env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! }, }); await agentHandle.sendPrompt( session.sessionId, `Fix the bug described in issue: ${issue}`, ); await agentHandle.closeSession(session.sessionId); });
// Step 3: Run tests const tests = await c.step("run-tests", async (c) => { return agentHandle.exec("cd /home/user/repo && npm test"); });
console.log("Tests exit code:", tests.exitCode); await message.complete(); } },});
const vm = agentOs({ options: { software: [common, pi] },});
export const registry = setup({ use: { automator, vm } });registry.start();import { createClient } from "rivetkit/client";import type { registry } from "./server";
const client = createClient<typeof registry>("http://localhost:6420");const handle = client.automator.getOrCreate(["main"]);
// Trigger the workflowawait handle.send("fixBug", { repo: "https://github.com/example/repo.git", issue: "Fix the login redirect bug",});Agent chaining
Section titled “Agent chaining”Output of one agent session feeds into the next. Each session is created and completed within its own step.
import { agentOs } from "rivetkit/agent-os";import common from "@rivet-dev/agent-os-common";import pi from "@rivet-dev/agent-os-pi";import { actor, setup, workflow } from "rivetkit";
const pipeline = actor({ workflows: { codeReview: workflow<{ filePath: string }>(), }, run: async (c) => { for await (const message of c.workflow.iter("codeReview")) { const agentHandle = c.actors.vm.getOrCreate([`review-${Date.now()}`]);
// Step 1: Agent reviews code and writes findings to a file await c.step("review", async (c) => { const session = await agentHandle.createSession("pi", { env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! }, }); await agentHandle.sendPrompt( session.sessionId, `Review the code at ${message.body.filePath} and write your findings to /home/user/review.md`, ); await agentHandle.closeSession(session.sessionId); });
// Step 2: Read the review from the filesystem const review = await c.step("read-review", async (c) => { const content = await agentHandle.readFile("/home/user/review.md"); return new TextDecoder().decode(content); });
// Step 3: Second session applies fixes based on the review await c.step("fix", async (c) => { const session = await agentHandle.createSession("pi", { env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! }, }); await agentHandle.sendPrompt( session.sessionId, `Apply the following review feedback:\n\n${review}`, ); await agentHandle.closeSession(session.sessionId); });
await message.complete(); } },});
const vm = agentOs({ options: { software: [common, pi] },});
export const registry = setup({ use: { pipeline, vm } });registry.start();import { createClient } from "rivetkit/client";import type { registry } from "./server";
const client = createClient<typeof registry>("http://localhost:6420");const handle = client.pipeline.getOrCreate(["main"]);
await handle.send("codeReview", { filePath: "/home/user/src/auth.ts" });Recommendations
Section titled “Recommendations”- Create and close sessions within the same step. Sessions are ephemeral and won’t exist after a workflow replays.
- Pass data between steps via the filesystem or step return values, not session state.
- Keep step names stable across code changes. Renaming a step breaks replay for in-progress workflows.
- Use separate actors for the workflow orchestrator and the agentOS VM.
- See Workflows for the full workflow API reference including timers, joins, and races.