tx send / tx inbox
Channel-based agent-to-agent messaging
Purpose
tx send and tx inbox provide channel-based messaging for agent-to-agent communication. Messages are persistent, cursor-based, and support acknowledgment for reliable delivery.
Key properties:
- Channel-based: Messages are organized into named channels (e.g., agent ID, topic,
task:tx-abc123) - Cursor-based fan-out: Multiple consumers read from the same channel using
afterIdcursors - Read-only inbox: Reading messages has no side effects -- explicit
tx ackis required - TTL support: Messages can auto-expire after a configurable duration
- Correlation IDs: Request/reply patterns via
correlationIdmatching
At A Glance
Read Without Consume
Message State
Send a Message
tx send <channel> <content> [options]Arguments
| Argument | Required | Description |
|---|---|---|
<channel> | Yes | Channel name (e.g., agent-1, task:tx-abc123) |
<content> | Yes | Message content |
Options
| Option | Description |
|---|---|
--sender <s> | Sender name (default: cli) |
--task <id> | Associated task ID |
--ttl <sec> | Time-to-live in seconds |
--correlation <id> | Correlation ID for request/reply |
--metadata '{}' | JSON metadata string |
--json | Output as JSON |
Examples
# Send a simple message
tx send agent-1 "Task tx-abc123 is ready for review"
# Send with metadata and TTL
tx send builds "Deploy v2.3.1 complete" --sender ci --ttl 3600 --metadata '{"version":"2.3.1"}'
# Send with correlation for request/reply
tx send planner "Need architecture review" --correlation req-001
# JSON output
tx send agent-1 "hello" --jsonimport { TxClient } from '@jamesaphoenix/tx-agent-sdk'
const tx = new TxClient({ apiUrl: 'http://localhost:3456' })
const message = await tx.messages.send({
channel: 'agent-1',
content: 'Task tx-abc123 is ready for review',
sender: 'orchestrator',
ttlSeconds: 3600
})
// Returns: { id, channel, sender, content, status, correlationId, taskId, metadata, createdAt, ackedAt, expiresAt }{
"tool": "tx_send",
"arguments": {
"channel": "agent-1",
"content": "Task tx-abc123 is ready for review",
"sender": "planner",
"taskId": "tx-abc123",
"ttlSeconds": 3600,
"correlationId": "req-001",
"metadata": "{\"priority\":\"high\"}"
}
}Parameters
| Parameter | Required | Description |
|---|---|---|
channel | Yes | Channel name |
content | Yes | Message content |
sender | No | Sender name (default: mcp) |
taskId | No | Associated task ID |
ttlSeconds | No | Time-to-live in seconds |
correlationId | No | Correlation ID for request/reply |
metadata | No | JSON metadata string |
POST /api/messages
Content-Type: application/json
{
"channel": "agent-1",
"content": "Task tx-abc123 is ready for review",
"sender": "orchestrator",
"taskId": "tx-abc123",
"ttlSeconds": 3600,
"correlationId": "req-001",
"metadata": { "priority": "high" }
}Response (201)
{
"id": 1,
"channel": "agent-1",
"sender": "orchestrator",
"content": "Task tx-abc123 is ready for review",
"status": "pending",
"correlationId": "req-001",
"taskId": "tx-abc123",
"metadata": { "priority": "high" },
"createdAt": "2025-01-28T10:00:00.000Z",
"ackedAt": null,
"expiresAt": "2025-01-28T11:00:00.000Z"
}Example
curl -X POST http://localhost:3456/api/messages \
-H "Content-Type: application/json" \
-d '{"channel": "agent-1", "content": "hello", "sender": "orchestrator"}'Read Inbox
Read messages from a channel. This is a pure read with no side effects -- messages are not marked as read or consumed.
tx inbox <channel> [options]Arguments
| Argument | Required | Description |
|---|---|---|
<channel> | Yes | Channel to read from |
Options
| Option | Description |
|---|---|
--after <id> | Cursor: only messages with ID greater than this value |
--limit <n> | Max messages to return |
--sender <s> | Filter by sender |
--correlation <id> | Filter by correlation ID |
--include-acked | Include acknowledged messages |
--json | Output as JSON |
Examples
# Read all pending messages
tx inbox agent-1
# Cursor-based pagination
tx inbox agent-1 --after 42
# Filter by sender
tx inbox agent-1 --sender planner
# Include already-acknowledged messages
tx inbox agent-1 --include-acked
# JSON output for scripting
tx inbox agent-1 --jsonOutput
[1] 2025-01-28 10:00:00 orchestrator: Task tx-abc123 is ready for review
[2] 2025-01-28 10:05:00 ci: Build passed for branch feature/auth
2 message(s)import { TxClient } from '@jamesaphoenix/tx-agent-sdk'
const tx = new TxClient({ apiUrl: 'http://localhost:3456' })
const messages = await tx.messages.inbox('agent-1', { limit: 10, afterId: 42 })
// messages: Array<{ id, channel, sender, content, status, ... }>{
"tool": "tx_inbox",
"arguments": {
"channel": "agent-1",
"afterId": 42,
"limit": 10,
"sender": "planner",
"includeAcked": false
}
}Parameters
| Parameter | Required | Description |
|---|---|---|
channel | Yes | Channel to read from |
afterId | No | Cursor: only messages with ID > this value |
limit | No | Max messages (default: 50) |
sender | No | Filter by sender |
correlationId | No | Filter by correlation ID |
includeAcked | No | Include acknowledged messages |
GET /api/messages/inbox/:channelQuery Parameters
| Parameter | Description |
|---|---|
afterId | Cursor: only messages with ID > this value |
limit | Max messages to return |
sender | Filter by sender |
correlationId | Filter by correlation ID |
includeAcked | Include acknowledged messages (true/false) |
Response (200)
{
"messages": [
{
"id": 1,
"channel": "agent-1",
"sender": "orchestrator",
"content": "Task tx-abc123 is ready for review",
"status": "pending",
"correlationId": null,
"taskId": null,
"metadata": {},
"createdAt": "2025-01-28T10:00:00.000Z",
"ackedAt": null,
"expiresAt": null
}
],
"channel": "agent-1",
"count": 1
}Example
curl http://localhost:3456/api/messages/inbox/agent-1?limit=10&afterId=0Acknowledge a Message
Mark a single message as acknowledged. This transitions its status from pending to acked.
tx ack <message-id> [options]Arguments
| Argument | Required | Description |
|---|---|---|
<message-id> | Yes | Numeric message ID |
Options
| Option | Description |
|---|---|
--json | Output as JSON |
Examples
tx ack 42import { TxClient } from '@jamesaphoenix/tx-agent-sdk'
const tx = new TxClient({ apiUrl: 'http://localhost:3456' })
const message = await tx.messages.ack(42){
"tool": "tx_ack",
"arguments": {
"id": 42
}
}POST /api/messages/:id/ackResponse (200)
{
"message": {
"id": 42,
"channel": "agent-1",
"sender": "orchestrator",
"content": "Task tx-abc123 is ready for review",
"status": "acked",
"correlationId": null,
"taskId": null,
"metadata": {},
"createdAt": "2025-01-28T10:00:00.000Z",
"ackedAt": "2025-01-28T10:15:00.000Z",
"expiresAt": null
}
}Example
curl -X POST http://localhost:3456/api/messages/42/ackAcknowledge All Messages
Acknowledge all pending messages on a channel at once.
tx ack all <channel> [options]Arguments
| Argument | Required | Description |
|---|---|---|
<channel> | Yes | Channel to acknowledge |
Options
| Option | Description |
|---|---|
--json | Output as JSON |
Examples
tx ack all agent-1import { TxClient } from '@jamesaphoenix/tx-agent-sdk'
const tx = new TxClient({ apiUrl: 'http://localhost:3456' })
const { channel, ackedCount } = await tx.messages.ackAll('agent-1'){
"tool": "tx_ack_all",
"arguments": {
"channel": "agent-1"
}
}POST /api/messages/inbox/:channel/ackResponse (200)
{
"channel": "agent-1",
"ackedCount": 5
}Example
curl -X POST http://localhost:3456/api/messages/inbox/agent-1/ackPending Count
Count unacknowledged messages on a channel.
tx outbox pending <channel> [options]Options
| Option | Description |
|---|---|
--json | Output as JSON |
Examples
tx outbox pending agent-1import { TxClient } from '@jamesaphoenix/tx-agent-sdk'
const tx = new TxClient({ apiUrl: 'http://localhost:3456' })
const count = await tx.messages.pending('agent-1'){
"tool": "tx_outbox_pending",
"arguments": {
"channel": "agent-1"
}
}GET /api/messages/inbox/:channel/countResponse (200)
{
"channel": "agent-1",
"count": 3
}Example
curl http://localhost:3456/api/messages/inbox/agent-1/countGarbage Collection
Clean up expired and old acknowledged messages.
tx outbox gc [options]Options
| Option | Description |
|---|---|
--acked-older-than <hours> | Remove acked messages older than N hours |
--json | Output as JSON |
Examples
# Remove expired + acked messages older than 24 hours
tx outbox gc --acked-older-than 24import { TxClient } from '@jamesaphoenix/tx-agent-sdk'
const tx = new TxClient({ apiUrl: 'http://localhost:3456' })
const { expired, acked } = await tx.messages.gc({ ackedOlderThanHours: 24 })There is no dedicated MCP tool for garbage collection. Use the REST API or CLI:
{
"note": "Use REST API endpoint POST /api/messages/gc or CLI: tx outbox gc"
}POST /api/messages/gc
Content-Type: application/json
{
"ackedOlderThanHours": 24
}Response (200)
{
"expired": 3,
"acked": 12
}Example
curl -X POST http://localhost:3456/api/messages/gc \
-H "Content-Type: application/json" \
-d '{"ackedOlderThanHours": 24}'Message Schema
Every message contains:
| Field | Type | Description |
|---|---|---|
id | number | Auto-incrementing message ID |
channel | string | Channel the message belongs to |
sender | string | Who sent the message |
content | string | Message body |
status | "pending" | "acked" | Current status |
correlationId | string | null | For request/reply patterns |
taskId | string | null | Associated task ID |
metadata | Record<string, unknown> | Arbitrary key-value data |
createdAt | string | ISO 8601 timestamp |
ackedAt | string | null | When the message was acknowledged |
expiresAt | string | null | When the message expires (from TTL) |
Use Case: Agent Coordination
Fan-out with Cursors
Multiple agents can read from the same channel independently using cursor-based pagination. Each agent tracks its own afterId cursor:
# Agent 1 reads messages
CURSOR_1=0
MESSAGES=$(tx inbox shared-work --after $CURSOR_1 --json)
CURSOR_1=$(echo "$MESSAGES" | jq -r '.[-1].id // 0')
# Agent 2 reads the same messages independently
CURSOR_2=0
MESSAGES=$(tx inbox shared-work --after $CURSOR_2 --json)
CURSOR_2=$(echo "$MESSAGES" | jq -r '.[-1].id // 0')Request/Reply Pattern
Use correlationId to match responses to requests:
# Requester sends
tx send reviewer "Please review PR #42" --correlation review-42
# Responder reads and replies
tx inbox reviewer --json | jq -r '.[] | select(.correlationId == "review-42")'
tx send requester "LGTM, approved" --correlation review-42
# Requester reads the reply
tx inbox requester --correlation review-42 --jsonTask-Scoped Channels
Use task:<id> channel naming to scope messages to a task:
# Send updates about a specific task
tx send "task:tx-abc123" "Started implementation" --task tx-abc123
tx send "task:tx-abc123" "Tests passing, ready for review" --task tx-abc123
# Read task-specific discussion
tx inbox "task:tx-abc123"Agent Loop with Inbox Polling
#!/bin/bash
AGENT_ID="worker-$$"
CURSOR=0
while true; do
# Check for new messages
MESSAGES=$(tx inbox "$AGENT_ID" --after $CURSOR --json)
COUNT=$(echo "$MESSAGES" | jq length)
if [ "$COUNT" -gt 0 ]; then
# Update cursor
CURSOR=$(echo "$MESSAGES" | jq -r '.[-1].id')
# Process each message
echo "$MESSAGES" | jq -c '.[]' | while read -r msg; do
CONTENT=$(echo "$msg" | jq -r '.content')
MSG_ID=$(echo "$msg" | jq -r '.id')
# Handle the message
echo "Processing: $CONTENT"
# Acknowledge when done
tx ack "$MSG_ID"
done
fi
sleep 2
done