tx

tx msg send / tx msg inbox

Channel-based agent-to-agent messaging

Purpose

tx msg send and tx msg inbox provide channel-based messaging for agent-to-agent communication. Messages are persistent, cursor-based, and support acknowledgment for reliable delivery.

Key properties:

  • Channel-based: Messages are organized into named channels (e.g., agent ID, topic, task:tx-abc123)
  • Cursor-based fan-out: Multiple consumers read from the same channel using afterId cursors
  • Read-only inbox: Reading messages has no side effects. Explicit tx msg ack is required.
  • TTL support: Messages can auto-expire after a configurable duration
  • Correlation IDs: Request/reply patterns via correlationId matching

At A Glance

Read Without Consume

Rendering diagram…
Inbox reads are pure. Multiple consumers can see the same pending message until someone explicitly acknowledges it.

Message State

Rendering diagram…
Only send, ack, TTL expiry, and garbage collection change message state. Reads just move the consumer cursor.

Send a Message

tx msg send <channel> <content> [options]

Arguments

ArgumentRequiredDescription
<channel>YesChannel name (e.g., agent-1, task:tx-abc123)
<content>YesMessage content

Options

OptionDescription
--sender <s>Sender name (default: cli)
--task <id>Associated task ID
--ttl <sec>Time-to-live in seconds
--correlation <id>Correlation ID for request/reply
--metadata '{}'JSON metadata string
--jsonOutput as JSON

Examples

# Send a simple message
tx msg send agent-1 "Task tx-abc123 is ready for review"

# Send with metadata and TTL
tx msg send builds "Deploy v2.3.1 complete" --sender ci --ttl 3600 --metadata '{"version":"2.3.1"}'

# Send with correlation for request/reply
tx msg send planner "Need architecture review" --correlation req-001

# JSON output
tx msg send agent-1 "hello" --json
import { TxClient } from '@jamesaphoenix/tx-agent-sdk'

const tx = new TxClient({ apiUrl: 'http://localhost:3456' })

const message = await tx.messages.send({
  channel: 'agent-1',
  content: 'Task tx-abc123 is ready for review',
  sender: 'orchestrator',
  ttlSeconds: 3600
})
// Returns: { id, channel, sender, content, status, correlationId, taskId, metadata, createdAt, ackedAt, expiresAt }
{
  "tool": "tx_msg_send",
  "arguments": {
    "channel": "agent-1",
    "content": "Task tx-abc123 is ready for review",
    "sender": "planner",
    "taskId": "tx-abc123",
    "ttlSeconds": 3600,
    "correlationId": "req-001",
    "metadata": "{\"priority\":\"high\"}"
  }
}

Parameters

ParameterRequiredDescription
channelYesChannel name
contentYesMessage content
senderNoSender name (default: mcp)
taskIdNoAssociated task ID
ttlSecondsNoTime-to-live in seconds
correlationIdNoCorrelation ID for request/reply
metadataNoJSON metadata string
POST /api/messages
Content-Type: application/json

{
  "channel": "agent-1",
  "content": "Task tx-abc123 is ready for review",
  "sender": "orchestrator",
  "taskId": "tx-abc123",
  "ttlSeconds": 3600,
  "correlationId": "req-001",
  "metadata": { "priority": "high" }
}

Response (201)

{
  "id": 1,
  "channel": "agent-1",
  "sender": "orchestrator",
  "content": "Task tx-abc123 is ready for review",
  "status": "pending",
  "correlationId": "req-001",
  "taskId": "tx-abc123",
  "metadata": { "priority": "high" },
  "createdAt": "2025-01-28T10:00:00.000Z",
  "ackedAt": null,
  "expiresAt": "2025-01-28T11:00:00.000Z"
}

Example

curl -X POST http://localhost:3456/api/messages \
  -H "Content-Type: application/json" \
  -d '{"channel": "agent-1", "content": "hello", "sender": "orchestrator"}'

Read Inbox

Read messages from a channel. This is a pure read with no side effects. Messages are not marked as read or consumed.

tx msg inbox <channel> [options]

Arguments

ArgumentRequiredDescription
<channel>YesChannel to read from

Options

OptionDescription
--after <id>Cursor: only messages with ID greater than this value
--limit <n>Max messages to return
--sender <s>Filter by sender
--correlation <id>Filter by correlation ID
--include-ackedInclude acknowledged messages
--jsonOutput as JSON

Examples

# Read all pending messages
tx msg inbox agent-1

# Cursor-based pagination
tx msg inbox agent-1 --after 42

# Filter by sender
tx msg inbox agent-1 --sender planner

# Include already-acknowledged messages
tx msg inbox agent-1 --include-acked

# JSON output for scripting
tx msg inbox agent-1 --json

Output

[1] 2025-01-28 10:00:00 orchestrator: Task tx-abc123 is ready for review
[2] 2025-01-28 10:05:00 ci: Build passed for branch feature/auth

2 message(s)
import { TxClient } from '@jamesaphoenix/tx-agent-sdk'

const tx = new TxClient({ apiUrl: 'http://localhost:3456' })

const messages = await tx.messages.inbox('agent-1', { limit: 10, afterId: 42 })
// messages: Array<{ id, channel, sender, content, status, ... }>
{
  "tool": "tx_msg_inbox",
  "arguments": {
    "channel": "agent-1",
    "afterId": 42,
    "limit": 10,
    "sender": "planner",
    "includeAcked": false
  }
}

Parameters

ParameterRequiredDescription
channelYesChannel to read from
afterIdNoCursor: only messages with ID > this value
limitNoMax messages (default: 50)
senderNoFilter by sender
correlationIdNoFilter by correlation ID
includeAckedNoInclude acknowledged messages
GET /api/messages/inbox/:channel

Query Parameters

ParameterDescription
afterIdCursor: only messages with ID > this value
limitMax messages to return
senderFilter by sender
correlationIdFilter by correlation ID
includeAckedInclude acknowledged messages (true/false)

Response (200)

{
  "messages": [
    {
      "id": 1,
      "channel": "agent-1",
      "sender": "orchestrator",
      "content": "Task tx-abc123 is ready for review",
      "status": "pending",
      "correlationId": null,
      "taskId": null,
      "metadata": {},
      "createdAt": "2025-01-28T10:00:00.000Z",
      "ackedAt": null,
      "expiresAt": null
    }
  ],
  "channel": "agent-1",
  "count": 1
}

Example

curl http://localhost:3456/api/messages/inbox/agent-1?limit=10&afterId=0

Acknowledge a Message

Mark a single message as acknowledged. This transitions its status from pending to acked.

tx msg ack <message-id> [options]

Arguments

ArgumentRequiredDescription
<message-id>YesNumeric message ID

Options

OptionDescription
--jsonOutput as JSON

Examples

tx msg ack 42
import { TxClient } from '@jamesaphoenix/tx-agent-sdk'

const tx = new TxClient({ apiUrl: 'http://localhost:3456' })

const message = await tx.messages.ack(42)
{
  "tool": "tx_msg_ack",
  "arguments": {
    "id": 42
  }
}
POST /api/messages/:id/ack

Response (200)

{
  "message": {
    "id": 42,
    "channel": "agent-1",
    "sender": "orchestrator",
    "content": "Task tx-abc123 is ready for review",
    "status": "acked",
    "correlationId": null,
    "taskId": null,
    "metadata": {},
    "createdAt": "2025-01-28T10:00:00.000Z",
    "ackedAt": "2025-01-28T10:15:00.000Z",
    "expiresAt": null
  }
}

Example

curl -X POST http://localhost:3456/api/messages/42/ack

Acknowledge All Messages

Acknowledge all pending messages on a channel at once.

tx msg ack all <channel> [options]

Arguments

ArgumentRequiredDescription
<channel>YesChannel to acknowledge

Options

OptionDescription
--jsonOutput as JSON

Examples

tx msg ack all agent-1
import { TxClient } from '@jamesaphoenix/tx-agent-sdk'

const tx = new TxClient({ apiUrl: 'http://localhost:3456' })

const { channel, ackedCount } = await tx.messages.ackAll('agent-1')
{
  "tool": "tx_msg_ack_all",
  "arguments": {
    "channel": "agent-1"
  }
}
POST /api/messages/inbox/:channel/ack

Response (200)

{
  "channel": "agent-1",
  "ackedCount": 5
}

Example

curl -X POST http://localhost:3456/api/messages/inbox/agent-1/ack

Pending Count

Count unacknowledged messages on a channel.

tx msg pending <channel> [options]

Options

OptionDescription
--jsonOutput as JSON

Examples

tx msg pending agent-1
import { TxClient } from '@jamesaphoenix/tx-agent-sdk'

const tx = new TxClient({ apiUrl: 'http://localhost:3456' })

const count = await tx.messages.pending('agent-1')
{
  "tool": "tx_msg_pending",
  "arguments": {
    "channel": "agent-1"
  }
}
GET /api/messages/inbox/:channel/count

Response (200)

{
  "channel": "agent-1",
  "count": 3
}

Example

curl http://localhost:3456/api/messages/inbox/agent-1/count

Garbage Collection

Clean up expired and old acknowledged messages.

tx msg gc [options]

Options

OptionDescription
--acked-older-than <hours>Remove acked messages older than N hours
--jsonOutput as JSON

Examples

# Remove expired + acked messages older than 24 hours
tx msg gc --acked-older-than 24
import { TxClient } from '@jamesaphoenix/tx-agent-sdk'

const tx = new TxClient({ apiUrl: 'http://localhost:3456' })

const { expired, acked } = await tx.messages.gc({ ackedOlderThanHours: 24 })
{
  "tool": "tx_msg_gc",
  "arguments": {
    "ackedOlderThanHours": 24
  }
}

Parameters

ParameterRequiredDescription
ackedOlderThanHoursNoRemove acked messages older than N hours (positive integer)

Returns

{
  "expired": 3,
  "acked": 12
}
POST /api/messages/gc
Content-Type: application/json

{
  "ackedOlderThanHours": 24
}

Response (200)

{
  "expired": 3,
  "acked": 12
}

Example

curl -X POST http://localhost:3456/api/messages/gc \
  -H "Content-Type: application/json" \
  -d '{"ackedOlderThanHours": 24}'

Message Schema

Every message contains:

FieldTypeDescription
idnumberAuto-incrementing message ID
channelstringChannel the message belongs to
senderstringWho sent the message
contentstringMessage body
status"pending" | "acked"Current status
correlationIdstring | nullFor request/reply patterns
taskIdstring | nullAssociated task ID
metadataRecord<string, unknown>Arbitrary key-value data
createdAtstringISO 8601 timestamp
ackedAtstring | nullWhen the message was acknowledged
expiresAtstring | nullWhen the message expires (from TTL)

Use Case: Agent Coordination

Fan-out with Cursors

Multiple agents can read from the same channel independently using cursor-based pagination. Each agent tracks its own afterId cursor:

# Agent 1 reads messages
CURSOR_1=0
MESSAGES=$(tx msg inbox shared-work --after $CURSOR_1 --json)
CURSOR_1=$(echo "$MESSAGES" | jq -r '.[-1].id // 0')

# Agent 2 reads the same messages independently
CURSOR_2=0
MESSAGES=$(tx msg inbox shared-work --after $CURSOR_2 --json)
CURSOR_2=$(echo "$MESSAGES" | jq -r '.[-1].id // 0')

Request/Reply Pattern

Use correlationId to match responses to requests:

# Requester sends
tx msg send reviewer "Please review PR #42" --correlation review-42

# Responder reads and replies
tx msg inbox reviewer --json | jq -r '.[] | select(.correlationId == "review-42")'
tx msg send requester "LGTM, approved" --correlation review-42

# Requester reads the reply
tx msg inbox requester --correlation review-42 --json

Task-Scoped Channels

Use task:<id> channel naming to scope messages to a task:

# Send updates about a specific task
tx msg send "task:tx-abc123" "Started implementation" --task tx-abc123
tx msg send "task:tx-abc123" "Tests passing, ready for review" --task tx-abc123

# Read task-specific discussion
tx msg inbox "task:tx-abc123"

Agent Loop with Inbox Polling

#!/bin/bash
AGENT_ID="worker-$$"
CURSOR=0

while true; do
  # Check for new messages
  MESSAGES=$(tx msg inbox "$AGENT_ID" --after $CURSOR --json)
  COUNT=$(echo "$MESSAGES" | jq length)

  if [ "$COUNT" -gt 0 ]; then
    # Update cursor
    CURSOR=$(echo "$MESSAGES" | jq -r '.[-1].id')

    # Process each message
    echo "$MESSAGES" | jq -c '.[]' | while read -r msg; do
      CONTENT=$(echo "$msg" | jq -r '.content')
      MSG_ID=$(echo "$msg" | jq -r '.id')

      # Handle the message
      echo "Processing: $CONTENT"

      # Acknowledge when done
      tx msg ack "$MSG_ID"
    done
  fi

  sleep 2
done

On this page