From 53e9798193c3bd7f6a7b3db97fdb4ff3f899fb0f Mon Sep 17 00:00:00 2001 From: Danil Nikolaev Date: Mon, 18 May 2026 20:43:35 +0300 Subject: [PATCH] Merge reply-chain documents into AI requests --- PIPELINE_TODO.md | 20 ++-- src/ai/conversation-pipeline.ts | 13 +++ src/ai/reply-chain-downloads.ts | 39 ++++++++ src/ai/unified-ai-request-pipeline.ts | 28 ++++-- src/ai/unified-ai-runner.shared.ts | 6 +- src/common/attachment-visibility.ts | 4 + src/common/message-part.ts | 3 + src/util/utils.ts | 53 ++++++++++- test/reply-chain-attachments.test.mjs | 131 ++++++++++++++++++++++++++ 9 files changed, 269 insertions(+), 28 deletions(-) create mode 100644 src/ai/reply-chain-downloads.ts create mode 100644 test/reply-chain-attachments.test.mjs diff --git a/PIPELINE_TODO.md b/PIPELINE_TODO.md index a917099..2e7a57a 100644 --- a/PIPELINE_TODO.md +++ b/PIPELINE_TODO.md @@ -106,16 +106,16 @@ ## 8. Улучшить поведение reply-chain с документами -- [ ] Явно описать стратегию merge: current user attachments + reply-chain user attachments. -- [ ] Исключать `scope: internal_artifact` всегда. -- [ ] Исключать `scope: bot_output`, если это не user-provided file. -- [ ] Если пользователь отвечает новым документом на ответ бота с предыдущим документом, использовать оба документа. -- [ ] Если пользователь отвечает текстом на ответ бота, использовать документы из reply-chain. -- [ ] Если пользователь явно говорит "этот файл", приоритет отдавать новому вложению. -- [ ] Если несколько документов, добавлять их имена в prompt/RAG context. -- [ ] Добавить tests на follow-up с новым документом. -- [ ] Добавить tests на follow-up без нового документа. -- [ ] Добавить tests на то, что RAG internal JSON не становится пользовательским документом. +- [x] Явно описать стратегию merge: current user attachments + reply-chain user attachments. +- [x] Исключать `scope: internal_artifact` всегда. +- [x] Исключать `scope: bot_output`, если это не user-provided file. +- [x] Если пользователь отвечает новым документом на ответ бота с предыдущим документом, использовать оба документа. +- [x] Если пользователь отвечает текстом на ответ бота, использовать документы из reply-chain. +- [x] Если пользователь явно говорит "этот файл", приоритет отдавать новому вложению. +- [x] Если несколько документов, добавлять их имена в prompt/RAG context. +- [x] Добавить tests на follow-up с новым документом. +- [x] Добавить tests на follow-up без нового документа. +- [x] Добавить tests на то, что RAG internal JSON не становится пользовательским документом. ## 9. Интеграционные tests без реальных Telegram/AI API diff --git a/src/ai/conversation-pipeline.ts b/src/ai/conversation-pipeline.ts index a30e844..fd4ea44 100644 --- a/src/ai/conversation-pipeline.ts +++ b/src/ai/conversation-pipeline.ts @@ -32,6 +32,7 @@ export type ConversationTurn = { content: string; deletedByBotAt?: number | null; attachments: ConversationAttachment[]; + documentNames?: string[]; }; export type ConversationSnapshot = { @@ -123,6 +124,13 @@ function attachmentSummary(attachments: ConversationAttachment[]): string { return ["[attachments]:", ...lines].join("\n"); } +function namesSummary(kind: string, names: string[]): string { + const filtered = names.map(name => name.trim()).filter(Boolean); + if (!filtered.length) return ""; + + return [`[${kind}]:`, ...filtered.map(name => `- ${name}`)].join("\n"); +} + function supportedAttachmentKinds(provider: AiProvider, bot: boolean): Set { if (bot) return new Set(); @@ -160,6 +168,10 @@ function renderContentText( parts.push("[message_state]: deleted_by_bot"); } + if (turn.documentNames?.length) { + parts.push(namesSummary("documents", turn.documentNames)); + } + if (unsupported.length) { parts.push(attachmentSummary(unsupported)); } @@ -291,6 +303,7 @@ export async function buildConversationSnapshot( content: part.content, deletedByBotAt: part.deletedByBotAt, attachments: buildConversationAttachments(part), + documentNames: part.documentNames, })); const imageCount = turns.reduce((sum, turn) => { diff --git a/src/ai/reply-chain-downloads.ts b/src/ai/reply-chain-downloads.ts new file mode 100644 index 0000000..b6923c3 --- /dev/null +++ b/src/ai/reply-chain-downloads.ts @@ -0,0 +1,39 @@ +import type {AiDownloadedFile} from "./telegram-attachments.js"; + +function downloadKey(download: AiDownloadedFile): string { + return [ + download.kind, + download.fileId, + download.sha256 ?? "", + download.fileName, + ].join(":"); +} + +export function mergeReplyChainDownloads( + currentDownloads: readonly AiDownloadedFile[], + replyChainDownloads: readonly AiDownloadedFile[], +): AiDownloadedFile[] { + const result: AiDownloadedFile[] = []; + const seen = new Set(); + + for (const download of [...currentDownloads, ...replyChainDownloads]) { + const key = downloadKey(download); + if (seen.has(key)) continue; + seen.add(key); + result.push(download); + } + + return result; +} + +export function shouldPreferCurrentDownloads(text: string, currentDownloads: readonly AiDownloadedFile[]): boolean { + if (!currentDownloads.length) return false; + + const normalized = text.trim().toLowerCase(); + if (!normalized) return false; + + return normalized.includes("this file") + || normalized.includes("this document") + || normalized.includes("этот файл") + || normalized.includes("этот документ"); +} diff --git a/src/ai/unified-ai-request-pipeline.ts b/src/ai/unified-ai-request-pipeline.ts index ca355b1..20a7be4 100644 --- a/src/ai/unified-ai-request-pipeline.ts +++ b/src/ai/unified-ai-request-pipeline.ts @@ -4,7 +4,8 @@ import {Environment} from "../common/environment"; import {UserRequestPipeline, type UserRequestPipelineState, type UserRequestPipelineStage} from "./user-request-pipeline"; import {PipelineFallbackNotifier} from "./user-request-pipeline/fallback-notifier"; import {buildToolRankFallbackTargetDetails} from "./user-request-pipeline/fallback-target-details"; -import type {AiDownloadedFile} from "./telegram-attachments"; +import {mergeReplyChainDownloads, shouldPreferCurrentDownloads} from "./reply-chain-downloads"; +import {attachmentsToDownloadedFiles, type AiDownloadedFile} from "./telegram-attachments"; import type {TelegramStreamMessage} from "./telegram-stream-message"; import type {ChatMessage} from "./chat-messages-types"; import type {OpenAIChatMessage} from "./openai-chat-message"; @@ -23,6 +24,7 @@ import { stripAudioFromRunnerMessages, toolRuntimeContextFromDownloads, transcribeAudioIfNeeded, + collectStoredReplyChainAttachments, UnifiedRunOptions, } from "./unified-ai-runner.shared"; import {aiLog} from "../logging/ai-logger"; @@ -92,6 +94,12 @@ export async function prepareUnifiedAiRequestPipeline(params: { controller: AbortController; }): Promise { const {options, config, downloads, streamMessage, controller} = params; + const replyChainDownloads = shouldPreferCurrentDownloads(options.text, downloads) + ? downloads + : mergeReplyChainDownloads( + downloads, + attachmentsToDownloadedFiles(await collectStoredReplyChainAttachments(options.msg)), + ); const prepared: MutablePreparedContext = { chatMessages: [], imageCount: 0, @@ -111,7 +119,7 @@ export async function prepareUnifiedAiRequestPipeline(params: { details: { phase: "ai_request_prepare", provider: options.provider, - downloads: downloads.map(download => ({ + downloads: replyChainDownloads.map(download => ({ kind: download.kind, fileName: download.fileName, mimeType: download.mimeType, @@ -128,15 +136,15 @@ export async function prepareUnifiedAiRequestPipeline(params: { options.msg, options.text, options.provider, - downloads, + replyChainDownloads, config, runtimeTargetFor(options, config), options.responseLanguage ?? DEFAULT_AI_RESPONSE_LANGUAGE, ); prepared.chatMessages = collected.chatMessages as typeof prepared.chatMessages; prepared.imageCount = collected.imageCount; - prepared.firstRoundStatus = initialStatus(downloads, prepared.imageCount); - prepared.toolContext = toolRuntimeContextFromDownloads(downloads); + prepared.firstRoundStatus = initialStatus(replyChainDownloads, prepared.imageCount); + prepared.toolContext = toolRuntimeContextFromDownloads(replyChainDownloads); return { stage: "collect_conversation_context", @@ -171,11 +179,11 @@ export async function prepareUnifiedAiRequestPipeline(params: { prepared.transcript = await transcribeAudioIfNeeded( options.provider, options.msg.from?.id, - downloads, + replyChainDownloads, streamMessage, controller.signal, ).catch(error => { - if (downloads.some(isTranscribableAudioDownload)) throw error; + if (replyChainDownloads.some(isTranscribableAudioDownload)) throw error; return ""; }); @@ -190,7 +198,7 @@ export async function prepareUnifiedAiRequestPipeline(params: { const transcriptArtifact = await persistTranscriptArtifactAttachment({ provider: options.provider, transcript, - downloads, + downloads: replyChainDownloads, chatId: options.msg.chat.id, messageId: options.msg.message_id, }); @@ -235,7 +243,7 @@ export async function prepareUnifiedAiRequestPipeline(params: { prepared.preparedDocumentRag = await prepareDocumentRag( options.provider, - downloads, + replyChainDownloads, prepared.chatMessages, streamMessage, config, @@ -246,7 +254,7 @@ export async function prepareUnifiedAiRequestPipeline(params: { const ragArtifact = await persistRagArtifactAttachment({ provider: options.provider, prepared: prepared.preparedDocumentRag, - downloads, + downloads: replyChainDownloads, chatId: options.msg.chat.id, messageId: options.msg.message_id, details: prepared.preparedDocumentRag?.provider === AiProvider.OPENAI diff --git a/src/ai/unified-ai-runner.shared.ts b/src/ai/unified-ai-runner.shared.ts index 8c34a08..acbe648 100644 --- a/src/ai/unified-ai-runner.shared.ts +++ b/src/ai/unified-ai-runner.shared.ts @@ -34,7 +34,7 @@ import {aiLog, aiLogDuration, aiLogProviderTarget, aiLogToolCall} from "../loggi import {buildConversationSnapshot, serializeConversationSnapshot} from "./conversation-pipeline.js"; import type {ResponseInputMessageContentList} from "openai/resources/responses/responses"; import {persistToolResultArtifactAttachment} from "./tool-result-artifact-store.js"; -import {filterUserVisibleStoredAttachments} from "../common/attachment-visibility.js"; +import {filterUserInputStoredAttachments} from "../common/attachment-visibility.js"; export type {Message} from "typescript-telegram-bot-api"; export type {AiRuntimeTarget} from "./ai-runtime-target"; @@ -515,13 +515,13 @@ export function addMessageAttachmentKinds(msg: Message | undefined, kinds: Set { +export async function collectStoredReplyChainAttachments(msg: Message, limit: number = 40): Promise { const attachments: StoredAttachment[] = []; const seen = new Set(); let current = await MessageStore.get(msg.chat.id, msg.message_id); for (let i = 0; current && i < limit; i++) { - for (const attachment of filterUserVisibleStoredAttachments(current?.attachments ?? [])) { + for (const attachment of filterUserInputStoredAttachments(current?.attachments ?? [])) { const key = [ attachment.kind, attachment.fileUniqueId || attachment.fileId, diff --git a/src/common/attachment-visibility.ts b/src/common/attachment-visibility.ts index 583c3a0..7a5aae2 100644 --- a/src/common/attachment-visibility.ts +++ b/src/common/attachment-visibility.ts @@ -3,3 +3,7 @@ import type {StoredAttachment} from "../model/stored-attachment"; export function filterUserVisibleStoredAttachments(attachments: StoredAttachment[]): StoredAttachment[] { return attachments.filter(attachment => attachment.scope !== "internal_artifact"); } + +export function filterUserInputStoredAttachments(attachments: StoredAttachment[]): StoredAttachment[] { + return attachments.filter(attachment => attachment.scope === "user_input" || attachment.scope === undefined); +} diff --git a/src/common/message-part.ts b/src/common/message-part.ts index c10d268..b01c7b4 100644 --- a/src/common/message-part.ts +++ b/src/common/message-part.ts @@ -20,6 +20,9 @@ export type MessagePart = { audios?: string[]; audioParts?: MessageAudioPart[]; documents?: string[]; + documentNames?: string[]; videos?: string[]; videoNotes?: string[]; + videoNames?: string[]; + videoNoteNames?: string[]; } diff --git a/src/util/utils.ts b/src/util/utils.ts index bbbc31c..58802c3 100644 --- a/src/util/utils.ts +++ b/src/util/utils.ts @@ -27,6 +27,7 @@ import {UserStore} from "../common/user-store.js"; import fs from "node:fs"; import path from "node:path"; import {MessageStore} from "../common/message-store.js"; +import {filterUserInputStoredAttachments} from "../common/attachment-visibility.js"; import {SystemInfo} from "../commands/system-info.js"; import {PrefixResponse} from "../commands/prefix-response.js"; import {ChatCommand} from "../base/chat-command.js"; @@ -1487,12 +1488,13 @@ export async function collectReplyChainText(options: ReplyChainOptions): Promise const cleanText = cutPrefix ? cutPrefixes(rawText) : rawText; const imageNames = await loadImagesIfExists(msg); const messageDownloads = includeDownloads ? downloads : []; - const storedImageAttachments = isStoredMessage(msg) - ? (msg.attachments ?? []).filter(attachment => attachment.kind === "image" && fs.existsSync(attachment.cachePath)) + const storedAttachments = isStoredMessage(msg) + ? filterUserInputStoredAttachments(msg.attachments ?? []).filter(attachment => fs.existsSync(attachment.cachePath)) : []; + const storedImageAttachments = storedAttachments.filter(attachment => attachment.kind === "image"); if (!cleanText && !quoteText && textRequired) return; - if (!cleanText && !quoteText && !imageNames?.length && !storedImageAttachments.length && !messageDownloads.length) return; + if (!cleanText && !quoteText && !imageNames?.length && !storedAttachments.length && !messageDownloads.length) return; const fromId = isStoredMessage(msg) ? msg.fromId : msg.from?.id; const user = await UserStore.get(isStoredMessage(msg) ? msg.fromId : msg.from?.id ?? -1); @@ -1527,11 +1529,19 @@ export async function collectReplyChainText(options: ReplyChainOptions): Promise }); const imageParts = [...photoImageParts, ...cachedImageParts]; + const storedDocumentAttachments = storedAttachments.filter(attachment => attachment.kind === "document"); + const storedVideoAttachments = storedAttachments.filter(attachment => attachment.kind === "video"); + const storedVideoNoteAttachments = storedAttachments.filter(attachment => attachment.kind === "video-note"); + const storedAudioAttachments = storedAttachments.filter(attachment => attachment.kind === "audio"); + const audios: string[] = []; const audioParts: MessageAudioPart[] = []; const documents: string[] = []; + const documentNames: string[] = []; const videos: string[] = []; + const videoNames: string[] = []; const videoNotes: string[] = []; + const videoNoteNames: string[] = []; if (messageDownloads.length) { messageDownloads @@ -1544,21 +1554,51 @@ export async function collectReplyChainText(options: ReplyChainOptions): Promise messageDownloads .filter(d => d.kind === "document") - .forEach(d => documents.push(d.buffer.toString("base64"))); + .forEach(d => { + documents.push(d.buffer.toString("base64")); + documentNames.push(d.fileName); + }); messageDownloads .filter(d => d.kind === "video") - .forEach(v => videos.push(v.buffer.toString("base64"))); + .forEach(v => { + videos.push(v.buffer.toString("base64")); + videoNames.push(v.fileName); + }); messageDownloads .filter(d => d.kind === "video-note") .forEach(v => { const data = v.buffer.toString("base64"); videoNotes.push(data); + videoNoteNames.push(v.fileName); audioParts.push({data, mimeType: mimeTypeFromAudioDownload(v)}); }); } + storedAudioAttachments.forEach(attachment => { + const data = Buffer.from(fs.readFileSync(attachment.cachePath)).toString("base64"); + audios.push(data); + audioParts.push({data, mimeType: attachment.mimeType || "audio/ogg"}); + }); + + storedDocumentAttachments.forEach(attachment => { + documents.push(Buffer.from(fs.readFileSync(attachment.cachePath)).toString("base64")); + documentNames.push(attachment.fileName); + }); + + storedVideoAttachments.forEach(attachment => { + videos.push(Buffer.from(fs.readFileSync(attachment.cachePath)).toString("base64")); + videoNames.push(attachment.fileName); + }); + + storedVideoNoteAttachments.forEach(attachment => { + const data = Buffer.from(fs.readFileSync(attachment.cachePath)).toString("base64"); + videoNotes.push(data); + videoNoteNames.push(attachment.fileName); + audioParts.push({data, mimeType: attachment.mimeType || "video/mp4"}); + }); + const content = [ quoteText ? `[citation]:\n${quoteText}\n\n[message]:\n` : "", cleanText ?? "" @@ -1576,8 +1616,11 @@ export async function collectReplyChainText(options: ReplyChainOptions): Promise audios: audios.length ? audios : undefined, audioParts: audioParts.length ? audioParts : undefined, documents: documents.length ? documents : undefined, + documentNames: documentNames.length ? documentNames : undefined, videos: videos.length ? videos : undefined, + videoNames: videoNames.length ? videoNames : undefined, videoNotes: videoNotes.length ? videoNotes : undefined, + videoNoteNames: videoNoteNames.length ? videoNoteNames : undefined, }); } }; diff --git a/test/reply-chain-attachments.test.mjs b/test/reply-chain-attachments.test.mjs new file mode 100644 index 0000000..837c7a1 --- /dev/null +++ b/test/reply-chain-attachments.test.mjs @@ -0,0 +1,131 @@ +import test from "node:test"; +import assert from "node:assert/strict"; + +const {filterUserInputStoredAttachments} = await import("../dist/common/attachment-visibility.js"); +const {mergeReplyChainDownloads, shouldPreferCurrentDownloads} = await import("../dist/ai/reply-chain-downloads.js"); + +test("reply chain attachment visibility keeps only user input attachments", () => { + const attachments = filterUserInputStoredAttachments([ + { + kind: "document", + fileId: "user-doc", + fileName: "user.txt", + cachePath: "/tmp/user.txt", + scope: "user_input", + }, + { + kind: "document", + fileId: "bot-doc", + fileName: "bot.txt", + cachePath: "/tmp/bot.txt", + scope: "bot_output", + }, + { + kind: "document", + fileId: "internal-doc", + fileName: "internal.json", + cachePath: "/tmp/internal.json", + scope: "internal_artifact", + }, + ]); + + assert.equal(attachments.length, 1); + assert.equal(attachments[0].fileId, "user-doc"); +}); + +test("reply chain downloads keep current input first and deduplicate chain copies", () => { + const merged = mergeReplyChainDownloads( + [ + { + kind: "document", + fileId: "new-doc", + fileName: "new.txt", + buffer: Buffer.from("new"), + path: "/tmp/new.txt", + }, + { + kind: "document", + fileId: "shared-doc", + fileName: "shared.txt", + buffer: Buffer.from("current"), + path: "/tmp/current-shared.txt", + }, + ], + [ + { + kind: "document", + fileId: "shared-doc", + fileName: "shared.txt", + buffer: Buffer.from("reply-chain"), + path: "/tmp/reply-shared.txt", + }, + { + kind: "document", + fileId: "old-doc", + fileName: "old.txt", + buffer: Buffer.from("old"), + path: "/tmp/old.txt", + }, + ], + ); + + assert.equal(merged.length, 3); + assert.equal(merged[0].fileId, "new-doc"); + assert.equal(merged[1].fileId, "shared-doc"); + assert.equal(merged[2].fileId, "old-doc"); +}); + +test("reply chain downloads are used when there is no new document", () => { + const merged = mergeReplyChainDownloads([], [ + { + kind: "document", + fileId: "reply-doc", + fileName: "reply.txt", + buffer: Buffer.from("reply"), + path: "/tmp/reply.txt", + }, + ]); + + assert.equal(merged.length, 1); + assert.equal(merged[0].fileId, "reply-doc"); +}); + +test("reply chain prefers current downloads when user points to this file", () => { + assert.equal( + shouldPreferCurrentDownloads("Please answer about this file", [{ + kind: "document", + fileId: "new-doc", + fileName: "new.txt", + buffer: Buffer.from("new"), + path: "/tmp/new.txt", + }]), + true, + ); + + assert.equal( + shouldPreferCurrentDownloads("ответь по этому файлу", [{ + kind: "document", + fileId: "new-doc", + fileName: "new.txt", + buffer: Buffer.from("new"), + path: "/tmp/new.txt", + }]), + false, + ); + + assert.equal( + shouldPreferCurrentDownloads("ответь на этот файл", [{ + kind: "document", + fileId: "new-doc", + fileName: "new.txt", + buffer: Buffer.from("new"), + path: "/tmp/new.txt", + }]), + true, + ); + + assert.equal( + shouldPreferCurrentDownloads("this file", []), + false, + ); +});