我们通过一个小型进程内队列序列化入站自动回复运行(所有频道),以防止多个 agent 运行发生冲突,同时仍允许跨会话的安全并行。
为什么需要队列
- 自动回复运行可能成本高昂(LLM 调用),当多个入站消息接近到达时可能会发生冲突
- 序列化可避免争夺共享资源(会话文件、日志、CLI stdin)并减少上游速率限制的机会
工作原理
- 通道感知的 FIFO 队列使用可配置的并发上限排空每个通道(未配置通道的默认值为 1;main 默认为 4,subagent 默认为 8)
runEmbeddedPiAgent按会话键(通道session:)排队以保证每个会话只有一个活动运行- 然后将每个会话运行排入全局通道(默认为
main),以便通过agents.defaults.maxConcurrent限制总体并行度 - 启用详细日志记录时,如果排队运行在启动前等待超过 ~2 秒,则会发出简短通知
- 输入指示器仍在入队时立即触发(当频道支持时),因此在等待期间用户体验不变
队列模式(每个频道)
入站消息可以引导当前运行、等待后续轮次或两者兼而有之:
steer:立即注入到当前运行中(在下一个工具边界后取消待处理的工具调用)。如果未流式传输,则回退到 followupfollowup:在当前运行结束后为下一个 agent 轮次排队collect:将所有排队的消息合并为单个后续轮次(默认)。如果消息针对不同的频道/主题,它们会单独排空以保留路由steer-backlog(也称为steer+backlog):立即引导并保留消息以进行后续轮次interrupt(旧版):中止该会话的活动运行,然后运行最新消息queue(旧版别名):与steer相同
Steer-backlog 意味着您可以在引导运行后获得后续响应,因此流式传输表面可能看起来像重复。如果您希望每个入站消息一个响应,请首选 collect/steer。发送 /queue collect 作为独立命令(每会话)或设置 messages.queue.byChannel.discord: "collect"。
默认值(配置中未设置时):
- 所有界面 →
collect
通过 messages.queue 全局或按频道配置:
{
messages: {
queue: {
mode: "collect",
debounceMs: 1000,
cap: 20,
drop: "summarize",
byChannel: {
discord: "collect"
},
},
},
}
队列选项
选项适用于 followup、collect 和 steer-backlog(以及当 steer 回退到 followup 时):
debounceMs:在开始后续轮次之前等待静默(防止"继续,继续")cap:每个会话的最大排队消息数drop:溢出策略(old、new、summarize)。Summarize 保留删除消息的简短项目符号列表,并将其作为合成后续提示注入
默认值:debounceMs: 1000、cap: 20、drop: summarize。
每会话覆盖
- 发送
/queue <mode>作为独立命令以存储当前会话的模式 - 可以组合选项:
/queue collect debounce:2s cap:25 drop:summarize /queue default或/queue reset清除会话覆盖
范围和保证
- 适用于使用 gateway 回复管道的所有入站频道的自动回复 agent 运行(WhatsApp web、Telegram、Slack、Discord、Signal、iMessage、webchat 等)
- 默认通道(
main)是进程范围的,用于入站 + 主心跳;设置agents.defaults.maxConcurrent以允许多个会话并行 - 可能存在其他通道(例如
cron、subagent),因此后台作业可以并行运行而不会阻塞入站回复 - 每会话通道保证一次只有一个 agent 运行接触给定会话
- 无外部依赖或后台工作线程;纯 TypeScript + promises
故障排除
- 如果命令似乎卡住,启用详细日志并查找"排队 ...ms"行以确认队列正在排空
- 如果需要队列深度,启用详细日志并观察队列时间行