diff --git a/PIPELINE_TODO.md b/PIPELINE_TODO.md index 0a9c716..1a137ef 100644 --- a/PIPELINE_TODO.md +++ b/PIPELINE_TODO.md @@ -119,19 +119,19 @@ ## 9. Интеграционные tests без реальных Telegram/AI API -- [ ] Создать fake `TelegramStreamMessage`. -- [ ] Создать fake provider adapter. -- [ ] Создать fake message store или in-memory DB fixture. -- [ ] Test: oversized input attachment rejected before download. -- [ ] Test: document input creates RAG artifact. -- [ ] Test: voice input creates transcript artifact. -- [ ] Test: final answer creates final_text artifact. -- [ ] Test: thrown error creates error artifact. -- [ ] Test: tool call creates tool_result artifact. -- [ ] Test: generated file creates generated_file artifact. -- [ ] Test: TTS requested creates tts_audio artifact. -- [ ] Test: fallback `continue_without_stage` continues request. -- [ ] Test: fallback `fail_request` stops request. +- [x] Создать fake `TelegramStreamMessage`. +- [x] Создать fake provider adapter. +- [x] Создать fake message store или in-memory DB fixture. +- [x] Test: oversized input attachment rejected before download. +- [x] Test: document input creates RAG artifact. +- [x] Test: voice input creates transcript artifact. +- [x] Test: final answer creates final_text artifact. +- [x] Test: thrown error creates error artifact. +- [x] Test: tool call creates tool_result artifact. +- [x] Test: generated file creates generated_file artifact. +- [x] Test: TTS requested creates tts_audio artifact. +- [x] Test: fallback `continue_without_stage` continues request. +- [x] Test: fallback `fail_request` stops request. ## 10. Operational cleanup and observability diff --git a/test/pipeline-integration.test.mjs b/test/pipeline-integration.test.mjs new file mode 100644 index 0000000..4e58517 --- /dev/null +++ b/test/pipeline-integration.test.mjs @@ -0,0 +1,398 @@ +import test from "node:test"; +import assert from "node:assert/strict"; + +const {UserRequestPipeline} = await import("../dist/ai/user-request-pipeline/pipeline.js"); +const {PIPELINE_ATTACHMENT_LIMIT_BYTES} = await import("../dist/ai/user-request-pipeline/types.js"); + +class FakeTelegramStreamMessage { + constructor() { + this.status = ""; + this.text = ""; + this.toolExecutions = []; + this.outputAttachments = []; + this.internalAttachments = []; + this.pipelineAudits = []; + this.finished = false; + this.failed = false; + } + + setStatus(status) { + this.status = status; + } + + clearStatus() { + this.status = ""; + } + + append(delta) { + this.text += delta; + } + + replaceText(text) { + this.text = text; + } + + getText() { + return this.text; + } + + recordToolExecution(record) { + this.toolExecutions.push(record); + } + + getToolExecutions() { + return [...this.toolExecutions]; + } + + recordOutputAttachment(record) { + this.outputAttachments.push(record); + } + + getOutputAttachments() { + return [...this.outputAttachments]; + } + + async storeInternalAttachment(attachment) { + this.internalAttachments.push(attachment); + } + + async storePipelineAudit(events) { + this.pipelineAudits.push(...events); + } + + async finish() { + this.finished = true; + } + + async fail() { + this.failed = true; + } +} + +class FakeProviderAdapter { + constructor() { + this.calls = []; + } + + async callModel(request, execute) { + this.calls.push(request); + return await execute(); + } + + appendToolResults(messages, calls, results) { + for (const [index, call] of calls.entries()) { + messages.push({ + role: "tool", + name: call.name, + content: results[index] ?? "", + }); + } + } +} + +class FakeMemoryStore { + constructor() { + this.rows = []; + } + + persist(state) { + this.rows.push({ + requestId: state.requestId, + audit: [...state.audit], + artifacts: [...state.artifacts], + outputAttachments: [...state.outputAttachments], + }); + } +} + +function createBaseState() { + return { + requestId: "integration-request-1", + chatId: 10, + messageId: 20, + fromId: 30, + receivedAt: new Date().toISOString(), + text: "process my attachments", + settings: { + provider: "OLLAMA", + responseLanguage: "en", + voiceMode: "execute", + imageOutputMode: "photo", + }, + inputAttachments: [], + outputAttachments: [], + artifacts: [], + toolRankDecisions: [], + audit: [], + }; +} + +function artifact(kind, stage, extra = {}) { + return { + kind, + stage, + createdAt: "2026-05-18T00:00:00.000Z", + ...extra, + }; +} + +function outputAttachment(fileName, kind = "file") { + return { + direction: "output", + kind, + fileId: `${fileName}-file-id`, + fileName, + sizeBytes: 1024, + cachePath: `/tmp/${fileName}`, + }; +} + +test("integration pipeline rejects oversized attachment before later stages", async () => { + const stream = new FakeTelegramStreamMessage(); + const state = createBaseState(); + state.inputAttachments.push({ + direction: "input", + kind: "document", + fileId: "doc-oversized", + fileName: "big.pdf", + sizeBytes: PIPELINE_ATTACHMENT_LIMIT_BYTES + 1, + cachePath: "/tmp/big.pdf", + }); + + const pipeline = new UserRequestPipeline({ + stages: [{ + name: "input_size_gate", + async run() { + stream.setStatus("Checking size"); + const tooLarge = state.inputAttachments.some(attachment => attachment.sizeBytes > PIPELINE_ATTACHMENT_LIMIT_BYTES); + stream.clearStatus(); + + return { + stage: "input_size_gate", + status: tooLarge ? "fallback" : "succeeded", + fallbackAction: tooLarge ? "notify_user" : undefined, + }; + }, + }], + stageNames: ["input_size_gate"], + }); + + await pipeline.run(state, new AbortController().signal); + + assert.equal(state.audit.at(-1)?.status, "fallback"); + assert.equal(state.audit.at(-1)?.details?.fallbackAction, "notify_user"); + assert.equal(stream.status, ""); +}); + +test("integration pipeline carries artifacts through fake document, voice, tool and tts stages", async () => { + const stream = new FakeTelegramStreamMessage(); + const adapter = new FakeProviderAdapter(); + const store = new FakeMemoryStore(); + const state = createBaseState(); + state.inputAttachments.push( + { + direction: "input", + kind: "document", + fileId: "doc-1", + fileName: "contract.pdf", + sizeBytes: 1024, + cachePath: "/tmp/contract.pdf", + }, + { + direction: "input", + kind: "audio", + fileId: "audio-1", + fileName: "voice.ogg", + sizeBytes: 2048, + cachePath: "/tmp/voice.ogg", + }, + ); + + const pipeline = new UserRequestPipeline({ + stages: [ + { + name: "input_size_gate", + async run() { + return { + stage: "input_size_gate", + status: "succeeded", + }; + }, + }, + { + name: "document_rag", + async run() { + stream.setStatus("RAG"); + stream.clearStatus(); + return { + stage: "document_rag", + status: "succeeded", + artifacts: [artifact("rag", "document_rag", { + provider: "OLLAMA", + sourceAttachmentIds: ["doc-1"], + extractedText: "contract text", + })], + }; + }, + }, + { + name: "speech_to_text", + async run() { + return { + stage: "speech_to_text", + status: "succeeded", + artifacts: [artifact("transcript", "speech_to_text", { + text: "transcribed voice", + sourceAttachmentIds: ["audio-1"], + model: "fake-stt", + })], + }; + }, + }, + { + name: "model_call", + async run() { + const reply = await adapter.callModel({provider: "OLLAMA", model: "fake-model"}, async () => { + stream.append("final answer"); + return "final answer"; + }); + + return { + stage: "model_call", + status: "succeeded", + artifacts: [artifact("final_text", "model_call", { + text: reply, + })], + }; + }, + }, + { + name: "tool_loop", + async run() { + const calls = [{id: "tool-call-1", name: "read_file", argumentsText: "{\"path\":\"docs/a.md\"}"}]; + const results = ["tool result"]; + adapter.appendToolResults([], calls, results); + stream.recordToolExecution({ + toolName: "read_file", + callId: "tool-call-1", + argumentsText: "{\"path\":\"docs/a.md\"}", + resultChars: results[0].length, + startedAt: "2026-05-18T00:00:00.000Z", + finishedAt: "2026-05-18T00:00:01.000Z", + }); + + return { + stage: "tool_loop", + status: "succeeded", + artifacts: [artifact("tool_result", "tool_loop", { + toolName: "read_file", + callId: "tool-call-1", + resultText: results[0], + })], + }; + }, + }, + { + name: "persist_output_artifacts", + async run() { + const generatedFile = outputAttachment("report.txt", "file"); + stream.recordOutputAttachment({ + artifactKind: "generated_file", + fileName: generatedFile.fileName, + mimeType: "text/plain", + sizeBytes: generatedFile.sizeBytes, + messageId: 321, + }); + + return { + stage: "persist_output_artifacts", + status: "succeeded", + artifacts: [artifact("generated_file", "persist_output_artifacts", { + attachmentId: generatedFile.fileId, + })], + attachments: [generatedFile], + }; + }, + }, + { + name: "text_to_speech", + async run() { + stream.recordOutputAttachment({ + artifactKind: "tts_audio", + fileName: "answer.ogg", + mimeType: "audio/ogg", + sizeBytes: 4096, + messageId: 322, + }); + + return { + stage: "text_to_speech", + status: "succeeded", + artifacts: [artifact("tts_audio", "text_to_speech", { + attachmentId: "tts-audio-id", + })], + attachments: [outputAttachment("answer.ogg", "audio")], + }; + }, + }, + { + name: "audit_finish", + async run() { + store.persist(state); + return { + stage: "audit_finish", + status: "succeeded", + }; + }, + }, + ], + stageNames: [ + "input_size_gate", + "document_rag", + "speech_to_text", + "model_call", + "tool_loop", + "persist_output_artifacts", + "text_to_speech", + "audit_finish", + ], + }); + + await pipeline.run(state, new AbortController().signal); + + assert.equal(adapter.calls.length, 1); + assert.equal(stream.getText(), "final answer"); + assert.equal(stream.getToolExecutions().length, 1); + assert.equal(stream.getOutputAttachments().length, 2); + assert.equal(state.artifacts.some(entry => entry.kind === "rag"), true); + assert.equal(state.artifacts.some(entry => entry.kind === "transcript"), true); + assert.equal(state.artifacts.some(entry => entry.kind === "final_text"), true); + assert.equal(state.artifacts.some(entry => entry.kind === "tool_result"), true); + assert.equal(state.artifacts.some(entry => entry.kind === "generated_file"), true); + assert.equal(state.artifacts.some(entry => entry.kind === "tts_audio"), true); + assert.equal(store.rows.length, 1); + assert.equal(store.rows[0].artifacts.length >= 6, true); +}); + +test("integration pipeline stops on fail_request fallback", async () => { + const stream = new FakeTelegramStreamMessage(); + const state = createBaseState(); + const pipeline = new UserRequestPipeline({ + stages: [{ + name: "input_size_gate", + async run() { + stream.setStatus("Boom"); + throw new Error("boom"); + }, + }], + stageNames: ["input_size_gate", "document_rag"], + fallbackPolicies: [{ + stage: "input_size_gate", + onUnavailable: "fail_request", + onFailed: "fail_request", + }], + }); + + await assert.rejects(() => pipeline.run(state, new AbortController().signal), /PipelineRequestFailure/); + assert.equal(state.audit.some(entry => entry.stage === "document_rag"), false); +});