tx msg send / tx msg inbox Channel-based agent-to-agent messaging
tx msg send and tx msg inbox provide channel-based messaging for agent-to-agent communication. Messages are persistent, cursor-based, and support acknowledgment for reliable delivery.
Key properties:
Channel-based : Messages are organized into named channels (e.g., agent ID, topic, task:tx-abc123)
Cursor-based fan-out : Multiple consumers read from the same channel using afterId cursors
Read-only inbox : Reading messages has no side effects. Explicit tx msg ack is required.
TTL support : Messages can auto-expire after a configurable duration
Correlation IDs : Request/reply patterns via correlationId matching
Inbox reads are pure. Multiple consumers can see the same pending message until someone explicitly acknowledges it.
Only send, ack, TTL expiry, and garbage collection change message state. Reads just move the consumer cursor.
CLI TypeScript SDK MCP REST API
tx msg send < channe l > < conten t > [options] Argument Required Description <channel>Yes Channel name (e.g., agent-1, task:tx-abc123) <content>Yes Message content
Option Description --sender <s>Sender name (default: cli) --task <id>Associated task ID --ttl <sec>Time-to-live in seconds --correlation <id>Correlation ID for request/reply --metadata '{}'JSON metadata string --jsonOutput as JSON
# Send a simple message
tx msg send agent-1 "Task tx-abc123 is ready for review"
# Send with metadata and TTL
tx msg send builds "Deploy v2.3.1 complete" --sender ci --ttl 3600 --metadata '{"version":"2.3.1"}'
# Send with correlation for request/reply
tx msg send planner "Need architecture review" --correlation req-001
# JSON output
tx msg send agent-1 "hello" --json import { TxClient } from '@jamesaphoenix/tx-agent-sdk'
const tx = new TxClient ({ apiUrl: 'http://localhost:3456' })
const message = await tx.messages. send ({
channel: 'agent-1' ,
content: 'Task tx-abc123 is ready for review' ,
sender: 'orchestrator' ,
ttlSeconds: 3600
})
// Returns: { id, channel, sender, content, status, correlationId, taskId, metadata, createdAt, ackedAt, expiresAt } {
"tool" : "tx_msg_send" ,
"arguments" : {
"channel" : "agent-1" ,
"content" : "Task tx-abc123 is ready for review" ,
"sender" : "planner" ,
"taskId" : "tx-abc123" ,
"ttlSeconds" : 3600 ,
"correlationId" : "req-001" ,
"metadata" : "{ \" priority \" : \" high \" }"
}
} Parameter Required Description channelYes Channel name contentYes Message content senderNo Sender name (default: mcp) taskIdNo Associated task ID ttlSecondsNo Time-to-live in seconds correlationIdNo Correlation ID for request/reply metadataNo JSON metadata string
POST /api/messages
Content-Type: application/json
{
"channel" : "agent-1",
"content" : "Task tx-abc123 is ready for review",
"sender" : "orchestrator",
"taskId" : "tx-abc123",
"ttlSeconds" : 3600,
"correlationId" : "req-001",
"metadata" : { "priority": "high" }
} {
"id" : 1 ,
"channel" : "agent-1" ,
"sender" : "orchestrator" ,
"content" : "Task tx-abc123 is ready for review" ,
"status" : "pending" ,
"correlationId" : "req-001" ,
"taskId" : "tx-abc123" ,
"metadata" : { "priority" : "high" },
"createdAt" : "2025-01-28T10:00:00.000Z" ,
"ackedAt" : null ,
"expiresAt" : "2025-01-28T11:00:00.000Z"
} curl -X POST http://localhost:3456/api/messages \
-H "Content-Type: application/json" \
-d '{"channel": "agent-1", "content": "hello", "sender": "orchestrator"}'
Read messages from a channel. This is a pure read with no side effects. Messages are not marked as read or consumed.
CLI TypeScript SDK MCP REST API
tx msg inbox < channe l > [options] Argument Required Description <channel>Yes Channel to read from
Option Description --after <id>Cursor: only messages with ID greater than this value --limit <n>Max messages to return --sender <s>Filter by sender --correlation <id>Filter by correlation ID --include-ackedInclude acknowledged messages --jsonOutput as JSON
# Read all pending messages
tx msg inbox agent-1
# Cursor-based pagination
tx msg inbox agent-1 --after 42
# Filter by sender
tx msg inbox agent-1 --sender planner
# Include already-acknowledged messages
tx msg inbox agent-1 --include-acked
# JSON output for scripting
tx msg inbox agent-1 --json [1] 2025-01-28 10:00:00 orchestrator: Task tx-abc123 is ready for review
[2] 2025-01-28 10:05:00 ci: Build passed for branch feature/auth
2 message(s) import { TxClient } from '@jamesaphoenix/tx-agent-sdk'
const tx = new TxClient ({ apiUrl: 'http://localhost:3456' })
const messages = await tx.messages. inbox ( 'agent-1' , { limit: 10 , afterId: 42 })
// messages: Array<{ id, channel, sender, content, status, ... }> {
"tool" : "tx_msg_inbox" ,
"arguments" : {
"channel" : "agent-1" ,
"afterId" : 42 ,
"limit" : 10 ,
"sender" : "planner" ,
"includeAcked" : false
}
} Parameter Required Description channelYes Channel to read from afterIdNo Cursor: only messages with ID > this value limitNo Max messages (default: 50) senderNo Filter by sender correlationIdNo Filter by correlation ID includeAckedNo Include acknowledged messages
GET /api/messages/inbox/:channel Parameter Description afterIdCursor: only messages with ID > this value limitMax messages to return senderFilter by sender correlationIdFilter by correlation ID includeAckedInclude acknowledged messages (true/false)
{
"messages" : [
{
"id" : 1 ,
"channel" : "agent-1" ,
"sender" : "orchestrator" ,
"content" : "Task tx-abc123 is ready for review" ,
"status" : "pending" ,
"correlationId" : null ,
"taskId" : null ,
"metadata" : {},
"createdAt" : "2025-01-28T10:00:00.000Z" ,
"ackedAt" : null ,
"expiresAt" : null
}
],
"channel" : "agent-1" ,
"count" : 1
} curl http://localhost:3456/api/messages/inbox/agent-1?limit= 10 &afterId = 0
Mark a single message as acknowledged. This transitions its status from pending to acked.
CLI TypeScript SDK MCP REST API
tx msg ack < message-i d > [options] Argument Required Description <message-id>Yes Numeric message ID
Option Description --jsonOutput as JSON
import { TxClient } from '@jamesaphoenix/tx-agent-sdk'
const tx = new TxClient ({ apiUrl: 'http://localhost:3456' })
const message = await tx.messages. ack ( 42 ) {
"tool" : "tx_msg_ack" ,
"arguments" : {
"id" : 42
}
} POST /api/messages/:id/ack {
"message" : {
"id" : 42 ,
"channel" : "agent-1" ,
"sender" : "orchestrator" ,
"content" : "Task tx-abc123 is ready for review" ,
"status" : "acked" ,
"correlationId" : null ,
"taskId" : null ,
"metadata" : {},
"createdAt" : "2025-01-28T10:00:00.000Z" ,
"ackedAt" : "2025-01-28T10:15:00.000Z" ,
"expiresAt" : null
}
} curl -X POST http://localhost:3456/api/messages/42/ack
Acknowledge all pending messages on a channel at once.
CLI TypeScript SDK MCP REST API
tx msg ack all < channe l > [options] Argument Required Description <channel>Yes Channel to acknowledge
Option Description --jsonOutput as JSON
import { TxClient } from '@jamesaphoenix/tx-agent-sdk'
const tx = new TxClient ({ apiUrl: 'http://localhost:3456' })
const { channel , ackedCount } = await tx.messages. ackAll ( 'agent-1' ) {
"tool" : "tx_msg_ack_all" ,
"arguments" : {
"channel" : "agent-1"
}
} POST /api/messages/inbox/:channel/ack {
"channel" : "agent-1" ,
"ackedCount" : 5
} curl -X POST http://localhost:3456/api/messages/inbox/agent-1/ack
Count unacknowledged messages on a channel.
CLI TypeScript SDK MCP REST API
tx msg pending < channe l > [options] Option Description --jsonOutput as JSON
import { TxClient } from '@jamesaphoenix/tx-agent-sdk'
const tx = new TxClient ({ apiUrl: 'http://localhost:3456' })
const count = await tx.messages. pending ( 'agent-1' ) {
"tool" : "tx_msg_pending" ,
"arguments" : {
"channel" : "agent-1"
}
} GET /api/messages/inbox/:channel/count {
"channel" : "agent-1" ,
"count" : 3
} curl http://localhost:3456/api/messages/inbox/agent-1/count
Clean up expired and old acknowledged messages.
CLI TypeScript SDK MCP REST API
Option Description --acked-older-than <hours>Remove acked messages older than N hours --jsonOutput as JSON
# Remove expired + acked messages older than 24 hours
tx msg gc --acked-older-than 24 import { TxClient } from '@jamesaphoenix/tx-agent-sdk'
const tx = new TxClient ({ apiUrl: 'http://localhost:3456' })
const { expired , acked } = await tx.messages. gc ({ ackedOlderThanHours: 24 }) {
"tool" : "tx_msg_gc" ,
"arguments" : {
"ackedOlderThanHours" : 24
}
} Parameter Required Description ackedOlderThanHoursNo Remove acked messages older than N hours (positive integer)
{
"expired" : 3 ,
"acked" : 12
} POST /api/messages/gc
Content-Type: application/json
{
"ackedOlderThanHours" : 24
} {
"expired" : 3 ,
"acked" : 12
} curl -X POST http://localhost:3456/api/messages/gc \
-H "Content-Type: application/json" \
-d '{"ackedOlderThanHours": 24}'
Every message contains:
Field Type Description idnumberAuto-incrementing message ID channelstringChannel the message belongs to senderstringWho sent the message contentstringMessage body status"pending" | "acked"Current status correlationIdstring | nullFor request/reply patterns taskIdstring | nullAssociated task ID metadataRecord<string, unknown>Arbitrary key-value data createdAtstringISO 8601 timestamp ackedAtstring | nullWhen the message was acknowledged expiresAtstring | nullWhen the message expires (from TTL)
Multiple agents can read from the same channel independently using cursor-based pagination. Each agent tracks its own afterId cursor:
# Agent 1 reads messages
CURSOR_1 = 0
MESSAGES = $( tx msg inbox shared-work --after $CURSOR_1 --json )
CURSOR_1 = $( echo " $MESSAGES " | jq -r '.[-1].id // 0' )
# Agent 2 reads the same messages independently
CURSOR_2 = 0
MESSAGES = $( tx msg inbox shared-work --after $CURSOR_2 --json )
CURSOR_2 = $( echo " $MESSAGES " | jq -r '.[-1].id // 0' )
Use correlationId to match responses to requests:
# Requester sends
tx msg send reviewer "Please review PR #42" --correlation review-42
# Responder reads and replies
tx msg inbox reviewer --json | jq -r '.[] | select(.correlationId == "review-42")'
tx msg send requester "LGTM, approved" --correlation review-42
# Requester reads the reply
tx msg inbox requester --correlation review-42 --json
Use task:<id> channel naming to scope messages to a task:
# Send updates about a specific task
tx msg send "task:tx-abc123" "Started implementation" --task tx-abc123
tx msg send "task:tx-abc123" "Tests passing, ready for review" --task tx-abc123
# Read task-specific discussion
tx msg inbox "task:tx-abc123"
#!/bin/bash
AGENT_ID = "worker- $$ "
CURSOR = 0
while true ; do
# Check for new messages
MESSAGES = $( tx msg inbox " $AGENT_ID " --after $CURSOR --json )
COUNT = $( echo " $MESSAGES " | jq length )
if [ " $COUNT " -gt 0 ]; then
# Update cursor
CURSOR = $( echo " $MESSAGES " | jq -r '.[-1].id' )
# Process each message
echo " $MESSAGES " | jq -c '.[]' | while read -r msg ; do
CONTENT = $( echo " $msg " | jq -r '.content' )
MSG_ID = $( echo " $msg " | jq -r '.id' )
# Handle the message
echo "Processing: $CONTENT "
# Acknowledge when done
tx msg ack " $MSG_ID "
done
fi
sleep 2
done