From 507b15aa5f29e0e481bb9598a4fcc22b701da057 Mon Sep 17 00:00:00 2001 From: Danil Nikolaev Date: Mon, 18 May 2026 20:13:19 +0300 Subject: [PATCH] Add centralized pipeline fallback notifier --- PIPELINE_TODO.md | 12 +++--- src/ai/unified-ai-request-pipeline.ts | 17 +++++++++ src/ai/unified-ai-response-pipeline.ts | 17 +++++++++ .../fallback-notifier-registry.ts | 16 ++++++++ .../fallback-notifier-text.ts | 27 +++++++++++++ .../fallback-notifier.ts | 38 +++++++++++++++++++ src/common/environment.ts | 28 ++++++++++++++ test/pipeline-fallback-notifier.test.mjs | 28 ++++++++++++++ 8 files changed, 177 insertions(+), 6 deletions(-) create mode 100644 src/ai/user-request-pipeline/fallback-notifier-registry.ts create mode 100644 src/ai/user-request-pipeline/fallback-notifier-text.ts create mode 100644 src/ai/user-request-pipeline/fallback-notifier.ts create mode 100644 test/pipeline-fallback-notifier.test.mjs diff --git a/PIPELINE_TODO.md b/PIPELINE_TODO.md index 589d2b9..4320088 100644 --- a/PIPELINE_TODO.md +++ b/PIPELINE_TODO.md @@ -94,15 +94,15 @@ ## 7. Довести fallback notifications до централизованного UX -- [ ] Добавить `PipelineFallbackNotifier`. -- [ ] Для `notify_user` отправлять пользователю понятное сообщение. -- [ ] Для `continue_without_stage` писать короткий debug/audit без user notification. +- [x] Добавить `PipelineFallbackNotifier`. +- [x] Для `notify_user` отправлять пользователю понятное сообщение. +- [x] Для `continue_without_stage` писать короткий debug/audit без user notification. - [ ] Для `use_alternate_target` логировать исходный и alternate target. - [ ] Для `fail_request` завершать request через единый error path. - [ ] Добавить локализацию fallback messages. -- [ ] Добавить отдельные тексты для RAG failure, STT failure, TTS failure, tool failure. -- [ ] Не спамить пользователя несколькими fallback notifications за один request. -- [ ] Сохранять fallback notification в `request_audit.details`. +- [x] Добавить отдельные тексты для RAG failure, STT failure, TTS failure, tool failure. +- [x] Не спамить пользователя несколькими fallback notifications за один request. +- [x] Сохранять fallback notification в `request_audit.details`. ## 8. Улучшить поведение reply-chain с документами diff --git a/src/ai/unified-ai-request-pipeline.ts b/src/ai/unified-ai-request-pipeline.ts index 333c4cd..2a570ea 100644 --- a/src/ai/unified-ai-request-pipeline.ts +++ b/src/ai/unified-ai-request-pipeline.ts @@ -2,6 +2,7 @@ import {AiProvider} from "../model/ai-provider"; import {AI_VOICE_MODE_TRANSCRIPT, DEFAULT_AI_RESPONSE_LANGUAGE} from "../common/user-ai-settings"; import {Environment} from "../common/environment"; import {UserRequestPipeline, type UserRequestPipelineState, type UserRequestPipelineStage} from "./user-request-pipeline"; +import {PipelineFallbackNotifier} from "./user-request-pipeline/fallback-notifier"; import type {AiDownloadedFile} from "./telegram-attachments"; import type {TelegramStreamMessage} from "./telegram-stream-message"; import type {ChatMessage} from "./chat-messages-types"; @@ -290,6 +291,7 @@ export async function prepareUnifiedAiRequestPipeline(params: { ]; const state = createAiRequestPipelineState(options); + const fallbackNotifier = new PipelineFallbackNotifier(options.msg); const pipeline = new UserRequestPipeline({ stages, stageNames: [ @@ -301,6 +303,21 @@ export async function prepareUnifiedAiRequestPipeline(params: { "document_rag", "audit_finish", ], + onFallback: async decision => { + const notification = await fallbackNotifier.notify(state.requestId, decision); + state.audit.push({ + stage: decision.stage, + status: "fallback", + startedAt: nowIso(), + finishedAt: nowIso(), + details: { + fallbackAction: decision.action, + fallbackNotification: notification.text, + fallbackNotified: notification.notified, + reason: decision.reason, + }, + }); + }, }); await pipeline.run(state, controller.signal); await streamMessage.storePipelineAudit(state.audit); diff --git a/src/ai/unified-ai-response-pipeline.ts b/src/ai/unified-ai-response-pipeline.ts index 8f6d4a4..8497627 100644 --- a/src/ai/unified-ai-response-pipeline.ts +++ b/src/ai/unified-ai-response-pipeline.ts @@ -24,6 +24,7 @@ import {runMistral} from "./unified-ai-runner.mistral"; import {summarizeModelOutput} from "./response-model-output"; import {summarizeToolLoop} from "./tool-loop-summary"; import {persistToolLoopSummaryArtifactAttachment} from "./tool-loop-artifact-store"; +import {PipelineFallbackNotifier} from "./user-request-pipeline/fallback-notifier"; import { resolveTextToSpeechProviderForUser, sendSynthesizedSpeech, @@ -165,6 +166,7 @@ export async function runUnifiedAiResponsePipeline(params: { }): Promise { const {options, config, downloads, prepared, streamMessage, controller} = params; const state = createResponsePipelineState(options); + const fallbackNotifier = new PipelineFallbackNotifier(options.msg); const adapter = getProviderAdapter(options.provider); let selectedToolNames: string[] = []; let filteredTools: unknown[] = []; @@ -392,6 +394,21 @@ export async function runUnifiedAiResponsePipeline(params: { "persist_output_artifacts", "audit_finish", ], + onFallback: async decision => { + const notification = await fallbackNotifier.notify(state.requestId, decision); + state.audit.push({ + stage: decision.stage, + status: "fallback", + startedAt: new Date().toISOString(), + finishedAt: new Date().toISOString(), + details: { + fallbackAction: decision.action, + fallbackNotification: notification.text, + fallbackNotified: notification.notified, + reason: decision.reason, + }, + }); + }, }); try { diff --git a/src/ai/user-request-pipeline/fallback-notifier-registry.ts b/src/ai/user-request-pipeline/fallback-notifier-registry.ts new file mode 100644 index 0000000..13f609b --- /dev/null +++ b/src/ai/user-request-pipeline/fallback-notifier-registry.ts @@ -0,0 +1,16 @@ +import type {PipelineFallbackDecision} from "./fallback-executor.js"; + +export function fallbackNotificationKey(requestId: string, decision: PipelineFallbackDecision): string { + return `${requestId}:${decision.stage}:${decision.action}`; +} + +export class PipelineFallbackNotificationRegistry { + private readonly notifiedKeys = new Set(); + + claim(requestId: string, decision: PipelineFallbackDecision): boolean { + const key = fallbackNotificationKey(requestId, decision); + if (this.notifiedKeys.has(key)) return false; + this.notifiedKeys.add(key); + return true; + } +} diff --git a/src/ai/user-request-pipeline/fallback-notifier-text.ts b/src/ai/user-request-pipeline/fallback-notifier-text.ts new file mode 100644 index 0000000..d375395 --- /dev/null +++ b/src/ai/user-request-pipeline/fallback-notifier-text.ts @@ -0,0 +1,27 @@ +import type {PipelineFallbackAction, PipelineStageName} from "./types.js"; + +const DEFAULT_TEXT = "⚠️ I had to skip part of the request, but I can continue."; +const NOTIFY_TEXT = "⚠️ I hit a problem and need to continue with a fallback."; +const FAIL_TEXT = "⚠️ I could not finish this request."; +const RAG_TEXT = "⚠️ Document retrieval failed, so I will answer without RAG."; +const STT_TEXT = "⚠️ Speech transcription failed, so I will continue without the audio transcript."; +const TTS_TEXT = "⚠️ Text-to-speech failed, so I will continue without audio output."; +const TOOL_TEXT = "⚠️ Tool execution failed, so I will continue without that tool."; + +export function resolvePipelineFallbackText(stage: PipelineStageName, action: PipelineFallbackAction): string | undefined { + if (action === "continue_without_stage") return undefined; + if (action === "fail_request") return FAIL_TEXT; + + switch (stage) { + case "speech_to_text": + return STT_TEXT; + case "document_rag": + return RAG_TEXT; + case "tool_loop": + return TOOL_TEXT; + case "text_to_speech": + return TTS_TEXT; + default: + return action === "notify_user" ? NOTIFY_TEXT : DEFAULT_TEXT; + } +} diff --git a/src/ai/user-request-pipeline/fallback-notifier.ts b/src/ai/user-request-pipeline/fallback-notifier.ts new file mode 100644 index 0000000..f4ae06c --- /dev/null +++ b/src/ai/user-request-pipeline/fallback-notifier.ts @@ -0,0 +1,38 @@ +import type {Message} from "typescript-telegram-bot-api"; +import {replyToMessage, logError} from "../../util/utils.js"; +import type {PipelineFallbackDecision} from "./fallback-executor.js"; +import {PipelineFallbackNotificationRegistry} from "./fallback-notifier-registry.js"; +import {resolvePipelineFallbackText} from "./fallback-notifier-text.js"; + +export class PipelineFallbackNotifier { + private readonly registry = new PipelineFallbackNotificationRegistry(); + + constructor( + private readonly sourceMessage: Message, + private readonly sendFallbackMessage: (text: string) => Promise = async text => { + await replyToMessage({ + message: this.sourceMessage, + text, + }); + }, + ) {} + + async notify(requestId: string, decision: PipelineFallbackDecision): Promise<{notified: boolean; text?: string}> { + if (!this.registry.claim(requestId, decision)) { + return {notified: false}; + } + + const text = resolvePipelineFallbackText(decision.stage, decision.action); + if (!text) { + return {notified: false}; + } + + try { + await this.sendFallbackMessage(text); + return {notified: true, text}; + } catch (error) { + logError(error instanceof Error ? error : String(error)); + return {notified: false, text}; + } + } +} diff --git a/src/common/environment.ts b/src/common/environment.ts index dc45976..e4429ba 100644 --- a/src/common/environment.ts +++ b/src/common/environment.ts @@ -823,6 +823,34 @@ export class Environment { return this.text("noTextToSynthesizeText", "No text to synthesize."); } + static get pipelineFallbackGenericText() { + return this.text("pipelineFallbackGenericText", "⚠️ I had to skip part of the request, but I can continue."); + } + + static get pipelineFallbackNotifyText() { + return this.text("pipelineFallbackNotifyText", "⚠️ I hit a problem and need to continue with a fallback."); + } + + static get pipelineFallbackFailText() { + return this.text("pipelineFallbackFailText", "⚠️ I could not finish this request."); + } + + static get pipelineFallbackRagText() { + return this.text("pipelineFallbackRagText", "⚠️ Document retrieval failed, so I will answer without RAG."); + } + + static get pipelineFallbackSpeechToTextText() { + return this.text("pipelineFallbackSpeechToTextText", "⚠️ Speech transcription failed, so I will continue without the audio transcript."); + } + + static get pipelineFallbackTextToSpeechText() { + return this.text("pipelineFallbackTextToSpeechText", "⚠️ Text-to-speech failed, so I will continue without audio output."); + } + + static get pipelineFallbackToolText() { + return this.text("pipelineFallbackToolText", "⚠️ Tool execution failed, so I will continue without that tool."); + } + static get mistralTtsNoAudioDataText() { return this.text("mistralTtsNoAudioDataText", "Mistral TTS did not return audioData."); } diff --git a/test/pipeline-fallback-notifier.test.mjs b/test/pipeline-fallback-notifier.test.mjs new file mode 100644 index 0000000..43805eb --- /dev/null +++ b/test/pipeline-fallback-notifier.test.mjs @@ -0,0 +1,28 @@ +import test from "node:test"; +import assert from "node:assert/strict"; + +const {PipelineFallbackNotificationRegistry} = await import("../dist/ai/user-request-pipeline/fallback-notifier-registry.js"); +const {resolvePipelineFallbackText} = await import("../dist/ai/user-request-pipeline/fallback-notifier-text.js"); + +test("pipeline fallback text maps notify_user to a user-facing message", () => { + assert.match(resolvePipelineFallbackText("document_rag", "notify_user"), /RAG/i); + assert.match(resolvePipelineFallbackText("speech_to_text", "notify_user"), /transcription/i); + assert.match(resolvePipelineFallbackText("tool_loop", "notify_user"), /tool/i); +}); + +test("pipeline fallback text stays silent for continue_without_stage", () => { + assert.equal(resolvePipelineFallbackText("document_rag", "continue_without_stage"), undefined); + assert.equal(resolvePipelineFallbackText("tool_loop", "continue_without_stage"), undefined); +}); + +test("pipeline fallback notification registry deduplicates one request-stage-action", () => { + const registry = new PipelineFallbackNotificationRegistry(); + const decision = { + stage: "tool_loop", + action: "notify_user", + }; + + assert.equal(registry.claim("request-1", decision), true); + assert.equal(registry.claim("request-1", decision), false); + assert.equal(registry.claim("request-2", decision), true); +});