Orchestration
Queues
- Serial execution ensures agents process one task at a time
- Durable messages survive sleep and restart
- Completable messages for request/response patterns with agents
- Backpressure absorbs bursts and prevents overload
Queue agent commands
Section titled “Queue agent commands”Use actor queues to serialize work that an agent processes one task at a time.
import { agentOs } from "rivetkit/agent-os";import common from "@rivet-dev/agent-os-common";import pi from "@rivet-dev/agent-os-pi";import { actor, queue, setup } from "rivetkit";
const taskRunner = actor({ queues: { tasks: queue<{ prompt: string }>(), }, run: async (c) => { const agentHandle = c.actors.vm.getOrCreate(["task-agent"]);
for await (const message of c.queue.iter()) { // Process one task at a time const session = await agentHandle.createSession("pi", { env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! }, }); await agentHandle.sendPrompt(session.sessionId, message.body.prompt); await agentHandle.closeSession(session.sessionId); } },});
const vm = agentOs({ options: { software: [common, pi] },});
export const registry = setup({ use: { taskRunner, vm } });registry.start();import { createClient } from "rivetkit/client";import type { registry } from "./server";
const client = createClient<typeof registry>("http://localhost:6420");const handle = client.taskRunner.getOrCreate(["main"]);
// Queue up work. Tasks are processed one at a time.await handle.send("tasks", { prompt: "Review PR #123" });await handle.send("tasks", { prompt: "Fix the flaky test in auth.test.ts" });await handle.send("tasks", { prompt: "Update the README" });Request/response with completable messages
Section titled “Request/response with completable messages”Use completable messages when the caller needs to wait for the agent to finish.
import { agentOs } from "rivetkit/agent-os";import common from "@rivet-dev/agent-os-common";import pi from "@rivet-dev/agent-os-pi";import { actor, queue, setup } from "rivetkit";
const reviewer = actor({ queues: { review: queue<{ file: string }, { summary: string }>(), }, run: async (c) => { const agentHandle = c.actors.vm.getOrCreate(["reviewer"]); const session = await agentHandle.createSession("pi", { env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! }, });
for await (const message of c.queue.iter({ completable: true })) { const content = await agentHandle.readFile(message.body.file); const text = new TextDecoder().decode(content);
await agentHandle.sendPrompt( session.sessionId, `Review this code and write a summary to /home/user/review.txt:\n\n${text}`, );
const review = await agentHandle.readFile("/home/user/review.txt"); await message.complete({ summary: new TextDecoder().decode(review), }); } },});
const vm = agentOs({ options: { software: [common, pi] },});
export const registry = setup({ use: { reviewer, vm } });registry.start();import { createClient } from "rivetkit/client";import type { registry } from "./server";
const client = createClient<typeof registry>("http://localhost:6420");const handle = client.reviewer.getOrCreate(["main"]);
// Wait for the agent to complete the reviewconst result = await handle.send( "review", { file: "/home/user/src/auth.ts" }, { wait: true, timeout: 120_000 },);
if (result.status === "completed") { console.log("Review:", result.response.summary);}Ingesting from external systems
Section titled “Ingesting from external systems”Accept tasks from webhooks, APIs, or other services and queue them for agent processing.
import { agentOs } from "rivetkit/agent-os";import common from "@rivet-dev/agent-os-common";import pi from "@rivet-dev/agent-os-pi";import { actor, queue, setup } from "rivetkit";
const issueWorker = actor({ queues: { issues: queue<{ title: string; body: string }>(), }, actions: { // HTTP endpoint to receive webhook payloads ingestIssue: async (c, title: string, body: string) => { await c.queue.push("issues", { title, body }); }, }, run: async (c) => { const agentHandle = c.actors.vm.getOrCreate(["issue-worker"]);
for await (const message of c.queue.iter()) { const session = await agentHandle.createSession("pi", { env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! }, }); await agentHandle.sendPrompt( session.sessionId, `Investigate and fix this issue:\n\nTitle: ${message.body.title}\n\n${message.body.body}`, ); await agentHandle.closeSession(session.sessionId); } },});
const vm = agentOs({ options: { software: [common, pi] },});
export const registry = setup({ use: { issueWorker, vm } });registry.start();import { createClient } from "rivetkit/client";import type { registry } from "./server";
const client = createClient<typeof registry>("http://localhost:6420");const handle = client.issueWorker.getOrCreate(["main"]);
// Ingest from a webhook or external systemawait handle.ingestIssue( "Login redirect broken", "Users are redirected to /undefined after login on mobile",);Recommendations
Section titled “Recommendations”- Use queues when you need guaranteed serial execution. Agents process one message at a time, preventing race conditions.
- Use completable messages when the caller needs the result. Set a generous timeout since agent work can take minutes.
- Queues survive actor sleep. Messages are persisted and processed when the actor wakes up.
- See Queues & Run Loops for the full queue API reference.