Skip to content
GitHub Get Started
Orchestration

Queues

  • Serial execution ensures agents process one task at a time
  • Durable messages survive sleep and restart
  • Completable messages for request/response patterns with agents
  • Backpressure absorbs bursts and prevents overload

Use actor queues to serialize work that an agent processes one task at a time.

server.ts
import { agentOs } from "rivetkit/agent-os";
import common from "@rivet-dev/agent-os-common";
import pi from "@rivet-dev/agent-os-pi";
import { actor, queue, setup } from "rivetkit";
const taskRunner = actor({
queues: {
tasks: queue<{ prompt: string }>(),
},
run: async (c) => {
const agentHandle = c.actors.vm.getOrCreate(["task-agent"]);
for await (const message of c.queue.iter()) {
// Process one task at a time
const session = await agentHandle.createSession("pi", {
env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! },
});
await agentHandle.sendPrompt(session.sessionId, message.body.prompt);
await agentHandle.closeSession(session.sessionId);
}
},
});
const vm = agentOs({
options: { software: [common, pi] },
});
export const registry = setup({ use: { taskRunner, vm } });
registry.start();

Request/response with completable messages

Section titled “Request/response with completable messages”

Use completable messages when the caller needs to wait for the agent to finish.

server.ts
import { agentOs } from "rivetkit/agent-os";
import common from "@rivet-dev/agent-os-common";
import pi from "@rivet-dev/agent-os-pi";
import { actor, queue, setup } from "rivetkit";
const reviewer = actor({
queues: {
review: queue<{ file: string }, { summary: string }>(),
},
run: async (c) => {
const agentHandle = c.actors.vm.getOrCreate(["reviewer"]);
const session = await agentHandle.createSession("pi", {
env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! },
});
for await (const message of c.queue.iter({ completable: true })) {
const content = await agentHandle.readFile(message.body.file);
const text = new TextDecoder().decode(content);
await agentHandle.sendPrompt(
session.sessionId,
`Review this code and write a summary to /home/user/review.txt:\n\n${text}`,
);
const review = await agentHandle.readFile("/home/user/review.txt");
await message.complete({
summary: new TextDecoder().decode(review),
});
}
},
});
const vm = agentOs({
options: { software: [common, pi] },
});
export const registry = setup({ use: { reviewer, vm } });
registry.start();

Accept tasks from webhooks, APIs, or other services and queue them for agent processing.

server.ts
import { agentOs } from "rivetkit/agent-os";
import common from "@rivet-dev/agent-os-common";
import pi from "@rivet-dev/agent-os-pi";
import { actor, queue, setup } from "rivetkit";
const issueWorker = actor({
queues: {
issues: queue<{ title: string; body: string }>(),
},
actions: {
// HTTP endpoint to receive webhook payloads
ingestIssue: async (c, title: string, body: string) => {
await c.queue.push("issues", { title, body });
},
},
run: async (c) => {
const agentHandle = c.actors.vm.getOrCreate(["issue-worker"]);
for await (const message of c.queue.iter()) {
const session = await agentHandle.createSession("pi", {
env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! },
});
await agentHandle.sendPrompt(
session.sessionId,
`Investigate and fix this issue:\n\nTitle: ${message.body.title}\n\n${message.body.body}`,
);
await agentHandle.closeSession(session.sessionId);
}
},
});
const vm = agentOs({
options: { software: [common, pi] },
});
export const registry = setup({ use: { issueWorker, vm } });
registry.start();
  • Use queues when you need guaranteed serial execution. Agents process one message at a time, preventing race conditions.
  • Use completable messages when the caller needs the result. Set a generous timeout since agent work can take minutes.
  • Queues survive actor sleep. Messages are persisted and processed when the actor wakes up.
  • See Queues & Run Loops for the full queue API reference.