tx

tx send / tx inbox

Channel-based agent-to-agent messaging

Purpose

tx send and tx inbox provide channel-based messaging for agent-to-agent communication. Messages are persistent, cursor-based, and support acknowledgment for reliable delivery.

Key properties:

  • Channel-based: Messages are organized into named channels (e.g., agent ID, topic, task:tx-abc123)
  • Cursor-based fan-out: Multiple consumers read from the same channel using afterId cursors
  • Read-only inbox: Reading messages has no side effects -- explicit tx ack is required
  • TTL support: Messages can auto-expire after a configurable duration
  • Correlation IDs: Request/reply patterns via correlationId matching

At A Glance

Read Without Consume

Rendering diagram…
Inbox reads are pure. Multiple consumers can see the same pending message until someone explicitly acknowledges it.

Message State

Rendering diagram…
Only send, ack, TTL expiry, and garbage collection change message state. Reads just move the consumer cursor.

Send a Message

tx send <channel> <content> [options]

Arguments

ArgumentRequiredDescription
<channel>YesChannel name (e.g., agent-1, task:tx-abc123)
<content>YesMessage content

Options

OptionDescription
--sender <s>Sender name (default: cli)
--task <id>Associated task ID
--ttl <sec>Time-to-live in seconds
--correlation <id>Correlation ID for request/reply
--metadata '{}'JSON metadata string
--jsonOutput as JSON

Examples

# Send a simple message
tx send agent-1 "Task tx-abc123 is ready for review"

# Send with metadata and TTL
tx send builds "Deploy v2.3.1 complete" --sender ci --ttl 3600 --metadata '{"version":"2.3.1"}'

# Send with correlation for request/reply
tx send planner "Need architecture review" --correlation req-001

# JSON output
tx send agent-1 "hello" --json
import { TxClient } from '@jamesaphoenix/tx-agent-sdk'

const tx = new TxClient({ apiUrl: 'http://localhost:3456' })

const message = await tx.messages.send({
  channel: 'agent-1',
  content: 'Task tx-abc123 is ready for review',
  sender: 'orchestrator',
  ttlSeconds: 3600
})
// Returns: { id, channel, sender, content, status, correlationId, taskId, metadata, createdAt, ackedAt, expiresAt }
{
  "tool": "tx_send",
  "arguments": {
    "channel": "agent-1",
    "content": "Task tx-abc123 is ready for review",
    "sender": "planner",
    "taskId": "tx-abc123",
    "ttlSeconds": 3600,
    "correlationId": "req-001",
    "metadata": "{\"priority\":\"high\"}"
  }
}

Parameters

ParameterRequiredDescription
channelYesChannel name
contentYesMessage content
senderNoSender name (default: mcp)
taskIdNoAssociated task ID
ttlSecondsNoTime-to-live in seconds
correlationIdNoCorrelation ID for request/reply
metadataNoJSON metadata string
POST /api/messages
Content-Type: application/json

{
  "channel": "agent-1",
  "content": "Task tx-abc123 is ready for review",
  "sender": "orchestrator",
  "taskId": "tx-abc123",
  "ttlSeconds": 3600,
  "correlationId": "req-001",
  "metadata": { "priority": "high" }
}

Response (201)

{
  "id": 1,
  "channel": "agent-1",
  "sender": "orchestrator",
  "content": "Task tx-abc123 is ready for review",
  "status": "pending",
  "correlationId": "req-001",
  "taskId": "tx-abc123",
  "metadata": { "priority": "high" },
  "createdAt": "2025-01-28T10:00:00.000Z",
  "ackedAt": null,
  "expiresAt": "2025-01-28T11:00:00.000Z"
}

Example

curl -X POST http://localhost:3456/api/messages \
  -H "Content-Type: application/json" \
  -d '{"channel": "agent-1", "content": "hello", "sender": "orchestrator"}'

Read Inbox

Read messages from a channel. This is a pure read with no side effects -- messages are not marked as read or consumed.

tx inbox <channel> [options]

Arguments

ArgumentRequiredDescription
<channel>YesChannel to read from

Options

OptionDescription
--after <id>Cursor: only messages with ID greater than this value
--limit <n>Max messages to return
--sender <s>Filter by sender
--correlation <id>Filter by correlation ID
--include-ackedInclude acknowledged messages
--jsonOutput as JSON

Examples

# Read all pending messages
tx inbox agent-1

# Cursor-based pagination
tx inbox agent-1 --after 42

# Filter by sender
tx inbox agent-1 --sender planner

# Include already-acknowledged messages
tx inbox agent-1 --include-acked

# JSON output for scripting
tx inbox agent-1 --json

Output

[1] 2025-01-28 10:00:00 orchestrator: Task tx-abc123 is ready for review
[2] 2025-01-28 10:05:00 ci: Build passed for branch feature/auth

2 message(s)
import { TxClient } from '@jamesaphoenix/tx-agent-sdk'

const tx = new TxClient({ apiUrl: 'http://localhost:3456' })

const messages = await tx.messages.inbox('agent-1', { limit: 10, afterId: 42 })
// messages: Array<{ id, channel, sender, content, status, ... }>
{
  "tool": "tx_inbox",
  "arguments": {
    "channel": "agent-1",
    "afterId": 42,
    "limit": 10,
    "sender": "planner",
    "includeAcked": false
  }
}

Parameters

ParameterRequiredDescription
channelYesChannel to read from
afterIdNoCursor: only messages with ID > this value
limitNoMax messages (default: 50)
senderNoFilter by sender
correlationIdNoFilter by correlation ID
includeAckedNoInclude acknowledged messages
GET /api/messages/inbox/:channel

Query Parameters

ParameterDescription
afterIdCursor: only messages with ID > this value
limitMax messages to return
senderFilter by sender
correlationIdFilter by correlation ID
includeAckedInclude acknowledged messages (true/false)

Response (200)

{
  "messages": [
    {
      "id": 1,
      "channel": "agent-1",
      "sender": "orchestrator",
      "content": "Task tx-abc123 is ready for review",
      "status": "pending",
      "correlationId": null,
      "taskId": null,
      "metadata": {},
      "createdAt": "2025-01-28T10:00:00.000Z",
      "ackedAt": null,
      "expiresAt": null
    }
  ],
  "channel": "agent-1",
  "count": 1
}

Example

curl http://localhost:3456/api/messages/inbox/agent-1?limit=10&afterId=0

Acknowledge a Message

Mark a single message as acknowledged. This transitions its status from pending to acked.

tx ack <message-id> [options]

Arguments

ArgumentRequiredDescription
<message-id>YesNumeric message ID

Options

OptionDescription
--jsonOutput as JSON

Examples

tx ack 42
import { TxClient } from '@jamesaphoenix/tx-agent-sdk'

const tx = new TxClient({ apiUrl: 'http://localhost:3456' })

const message = await tx.messages.ack(42)
{
  "tool": "tx_ack",
  "arguments": {
    "id": 42
  }
}
POST /api/messages/:id/ack

Response (200)

{
  "message": {
    "id": 42,
    "channel": "agent-1",
    "sender": "orchestrator",
    "content": "Task tx-abc123 is ready for review",
    "status": "acked",
    "correlationId": null,
    "taskId": null,
    "metadata": {},
    "createdAt": "2025-01-28T10:00:00.000Z",
    "ackedAt": "2025-01-28T10:15:00.000Z",
    "expiresAt": null
  }
}

Example

curl -X POST http://localhost:3456/api/messages/42/ack

Acknowledge All Messages

Acknowledge all pending messages on a channel at once.

tx ack all <channel> [options]

Arguments

ArgumentRequiredDescription
<channel>YesChannel to acknowledge

Options

OptionDescription
--jsonOutput as JSON

Examples

tx ack all agent-1
import { TxClient } from '@jamesaphoenix/tx-agent-sdk'

const tx = new TxClient({ apiUrl: 'http://localhost:3456' })

const { channel, ackedCount } = await tx.messages.ackAll('agent-1')
{
  "tool": "tx_ack_all",
  "arguments": {
    "channel": "agent-1"
  }
}
POST /api/messages/inbox/:channel/ack

Response (200)

{
  "channel": "agent-1",
  "ackedCount": 5
}

Example

curl -X POST http://localhost:3456/api/messages/inbox/agent-1/ack

Pending Count

Count unacknowledged messages on a channel.

tx outbox pending <channel> [options]

Options

OptionDescription
--jsonOutput as JSON

Examples

tx outbox pending agent-1
import { TxClient } from '@jamesaphoenix/tx-agent-sdk'

const tx = new TxClient({ apiUrl: 'http://localhost:3456' })

const count = await tx.messages.pending('agent-1')
{
  "tool": "tx_outbox_pending",
  "arguments": {
    "channel": "agent-1"
  }
}
GET /api/messages/inbox/:channel/count

Response (200)

{
  "channel": "agent-1",
  "count": 3
}

Example

curl http://localhost:3456/api/messages/inbox/agent-1/count

Garbage Collection

Clean up expired and old acknowledged messages.

tx outbox gc [options]

Options

OptionDescription
--acked-older-than <hours>Remove acked messages older than N hours
--jsonOutput as JSON

Examples

# Remove expired + acked messages older than 24 hours
tx outbox gc --acked-older-than 24
import { TxClient } from '@jamesaphoenix/tx-agent-sdk'

const tx = new TxClient({ apiUrl: 'http://localhost:3456' })

const { expired, acked } = await tx.messages.gc({ ackedOlderThanHours: 24 })

There is no dedicated MCP tool for garbage collection. Use the REST API or CLI:

{
  "note": "Use REST API endpoint POST /api/messages/gc or CLI: tx outbox gc"
}
POST /api/messages/gc
Content-Type: application/json

{
  "ackedOlderThanHours": 24
}

Response (200)

{
  "expired": 3,
  "acked": 12
}

Example

curl -X POST http://localhost:3456/api/messages/gc \
  -H "Content-Type: application/json" \
  -d '{"ackedOlderThanHours": 24}'

Message Schema

Every message contains:

FieldTypeDescription
idnumberAuto-incrementing message ID
channelstringChannel the message belongs to
senderstringWho sent the message
contentstringMessage body
status"pending" | "acked"Current status
correlationIdstring | nullFor request/reply patterns
taskIdstring | nullAssociated task ID
metadataRecord<string, unknown>Arbitrary key-value data
createdAtstringISO 8601 timestamp
ackedAtstring | nullWhen the message was acknowledged
expiresAtstring | nullWhen the message expires (from TTL)

Use Case: Agent Coordination

Fan-out with Cursors

Multiple agents can read from the same channel independently using cursor-based pagination. Each agent tracks its own afterId cursor:

# Agent 1 reads messages
CURSOR_1=0
MESSAGES=$(tx inbox shared-work --after $CURSOR_1 --json)
CURSOR_1=$(echo "$MESSAGES" | jq -r '.[-1].id // 0')

# Agent 2 reads the same messages independently
CURSOR_2=0
MESSAGES=$(tx inbox shared-work --after $CURSOR_2 --json)
CURSOR_2=$(echo "$MESSAGES" | jq -r '.[-1].id // 0')

Request/Reply Pattern

Use correlationId to match responses to requests:

# Requester sends
tx send reviewer "Please review PR #42" --correlation review-42

# Responder reads and replies
tx inbox reviewer --json | jq -r '.[] | select(.correlationId == "review-42")'
tx send requester "LGTM, approved" --correlation review-42

# Requester reads the reply
tx inbox requester --correlation review-42 --json

Task-Scoped Channels

Use task:<id> channel naming to scope messages to a task:

# Send updates about a specific task
tx send "task:tx-abc123" "Started implementation" --task tx-abc123
tx send "task:tx-abc123" "Tests passing, ready for review" --task tx-abc123

# Read task-specific discussion
tx inbox "task:tx-abc123"

Agent Loop with Inbox Polling

#!/bin/bash
AGENT_ID="worker-$$"
CURSOR=0

while true; do
  # Check for new messages
  MESSAGES=$(tx inbox "$AGENT_ID" --after $CURSOR --json)
  COUNT=$(echo "$MESSAGES" | jq length)

  if [ "$COUNT" -gt 0 ]; then
    # Update cursor
    CURSOR=$(echo "$MESSAGES" | jq -r '.[-1].id')

    # Process each message
    echo "$MESSAGES" | jq -c '.[]' | while read -r msg; do
      CONTENT=$(echo "$msg" | jq -r '.content')
      MSG_ID=$(echo "$msg" | jq -r '.id')

      # Handle the message
      echo "Processing: $CONTENT"

      # Acknowledge when done
      tx ack "$MSG_ID"
    done
  fi

  sleep 2
done

On this page