diff --git a/PIPELINE_TODO.md b/PIPELINE_TODO.md index 2e7a57a..cf50e78 100644 --- a/PIPELINE_TODO.md +++ b/PIPELINE_TODO.md @@ -135,14 +135,14 @@ ## 10. Operational cleanup and observability -- [ ] Add retention policy for `data/cache/internal-artifacts`. +- [x] Add retention policy for `data/cache/internal-artifacts`. - [ ] Add retention policy for stale RAG vector/library provider state. -- [ ] Add command or admin view for recent `ai_requests`. -- [ ] Add command or admin view for request audit by message id. -- [ ] Add command to inspect artifacts for a message. -- [ ] Add log correlation by `requestId` across AI logs, tool logs and DB audit. -- [ ] Add metrics counters: requests, failures, fallbacks, tool calls, RAG runs, TTS runs. -- [ ] Add startup migration logs for `ai_requests`, `attachments`, `artifacts`, `request_audit`. +- [x] Add command or admin view for recent `ai_requests`. +- [x] Add command or admin view for request audit by message id. +- [x] Add command to inspect artifacts for a message. +- [x] Add log correlation by `requestId` across AI logs, tool logs and DB audit. +- [x] Add metrics counters: requests, failures, fallbacks, tool calls, RAG runs, TTS runs. +- [x] Add startup migration logs for `ai_requests`, `attachments`, `artifacts`, `request_audit`. ## Suggested order diff --git a/locales/en.json b/locales/en.json index c6c6e5f..0fc0243 100644 --- a/locales/en.json +++ b/locales/en.json @@ -183,6 +183,9 @@ "getWhenPluralUnitText": "{unit}s", "getWhenDurationText": "{prefix}{value} {unit}", "commandDescriptions": { + "aiAudit": "Inspect AI request audit and artifacts", + "aiMetrics": "Show AI observability counters", + "aiRequests": "Show recent AI requests", "ae": "evaluation", "adminsAdd": "Add user to admins", "adminsRemove": "Remove user from admins", diff --git a/locales/ru.json b/locales/ru.json index 04f0d39..da71827 100644 --- a/locales/ru.json +++ b/locales/ru.json @@ -209,6 +209,9 @@ "getWhenPluralUnitText": "{unit}", "getWhenDurationText": "{prefix}{value} {unit}", "commandDescriptions": { + "aiRequests": "Показать последние AI-запросы", + "aiAudit": "Показать аудит AI-запроса и артефакты", + "aiMetrics": "Показать счётчики AI-обсервабилити", "ae": "вычисление", "adminsAdd": "Добавить пользователя в администраторы", "adminsRemove": "Удалить пользователя из администраторов", diff --git a/locales/ua.json b/locales/ua.json index 947c1ab..d23efbf 100644 --- a/locales/ua.json +++ b/locales/ua.json @@ -208,6 +208,9 @@ "getWhenPluralUnitText": "{unit}", "getWhenDurationText": "{prefix}{value} {unit}", "commandDescriptions": { + "aiRequests": "Показати останні AI-запити", + "aiAudit": "Показати аудит AI-запиту та артефакти", + "aiMetrics": "Показати лічильники AI-спостережуваності", "help": "Показати список команд", "settings": "Налаштування користувача", "start": "Запустити бота", diff --git a/src/ai/telegram-stream-message.ts b/src/ai/telegram-stream-message.ts index 9fa2624..029738d 100644 --- a/src/ai/telegram-stream-message.ts +++ b/src/ai/telegram-stream-message.ts @@ -5,6 +5,7 @@ import {Environment} from "../common/environment"; import {MessageStore} from "../common/message-store"; import {createQueuedFunction} from "../util/async-lock"; import {enqueueTelegramApiCall} from "../util/telegram-api-queue"; +import {appLogger} from "../logging/logger"; import fs from "node:fs"; import path from "node:path"; import {StoredAttachment, StoredAttachmentKind} from "../model/stored-attachment"; @@ -13,11 +14,13 @@ import {prepareTelegramMarkdownV2} from "../util/markdown-v2-renderer"; import {AiProvider} from "../model/ai-provider"; import {AI_IMAGE_OUTPUT_MODE_DOCUMENT, UserAiImageOutputMode} from "../common/user-ai-settings"; import {PIPELINE_ATTACHMENT_LIMIT_BYTES} from "./user-request-pipeline"; +import {recordToolCall} from "../common/ai-observability.js"; const TELEGRAM_LIMIT = 4096; const TELEGRAM_CAPTION_LIMIT = 1024; const TELEGRAM_PHOTO_LIMIT_BYTES = 10 * 1024 * 1024; const EDIT_INTERVAL_MS = 4500; +const logger = appLogger.child("telegram-stream-message"); export type TelegramArtifactFile = { kind: "image" | "file"; @@ -238,6 +241,13 @@ export class TelegramStreamMessage { recordToolExecution(record: TelegramToolExecutionRecord): void { this.toolExecutions.push(record); + recordToolCall(); + logger.debug("tool.execution.recorded", { + requestId: this.cancelRequestId, + toolName: record.toolName, + callId: record.callId, + resultChars: record.resultChars, + }); } getToolExecutions(): TelegramToolExecutionRecord[] { @@ -246,6 +256,13 @@ export class TelegramStreamMessage { recordOutputAttachment(record: TelegramOutputAttachmentRecord): void { this.outputAttachments.push(record); + logger.debug("output_attachment.recorded", { + requestId: this.cancelRequestId, + artifactKind: record.artifactKind, + fileName: record.fileName, + sizeBytes: record.sizeBytes, + messageId: record.messageId, + }); } getOutputAttachments(): TelegramOutputAttachmentRecord[] { diff --git a/src/ai/unified-ai-request-pipeline.ts b/src/ai/unified-ai-request-pipeline.ts index 20a7be4..9757874 100644 --- a/src/ai/unified-ai-request-pipeline.ts +++ b/src/ai/unified-ai-request-pipeline.ts @@ -15,6 +15,7 @@ import {prepareDocumentRag} from "./document-rag-pipeline"; import {persistRagArtifactAttachment} from "./rag-artifact-store"; import {persistTranscriptArtifactAttachment} from "./transcript-artifact-store"; import type {ToolRuntimeContext} from "./tools/runtime"; +import {recordPipelineFallback, recordRagRun} from "../common/ai-observability.js"; import { appendTranscriptToChatMessages, collectTextMessages, @@ -64,7 +65,7 @@ function runtimeTargetFor(options: UnifiedRunOptions, config: RuntimeConfigSnaps function createAiRequestPipelineState(options: UnifiedRunOptions): UserRequestPipelineState { return { - requestId: `ai:${options.msg.chat.id}:${options.msg.message_id}:${Date.now()}`, + requestId: options.requestId ?? `ai:${options.msg.chat.id}:${options.msg.message_id}:${Date.now()}`, chatId: options.msg.chat.id, messageId: options.msg.message_id, replyToMessageId: options.msg.reply_to_message?.message_id, @@ -274,6 +275,10 @@ export async function prepareUnifiedAiRequestPipeline(params: { await streamMessage.storeInternalAttachment(ragArtifact); } + if (prepared.preparedDocumentRag) { + recordRagRun(); + } + return { stage: "document_rag", status: prepared.preparedDocumentRag ? "succeeded" : "skipped", @@ -313,11 +318,13 @@ export async function prepareUnifiedAiRequestPipeline(params: { "audit_finish", ], onFallback: async decision => { + recordPipelineFallback(decision.action); if (decision.action === "use_alternate_target") { aiLog("warn", "request.fallback.use_alternate_target", { provider: options.provider, stage: decision.stage, reason: decision.reason, + requestId: state.requestId, ...buildToolRankFallbackTargetDetails(options.provider, config), }); } @@ -327,6 +334,7 @@ export async function prepareUnifiedAiRequestPipeline(params: { provider: options.provider, stage: decision.stage, reason: decision.reason, + requestId: state.requestId, }); } diff --git a/src/ai/unified-ai-response-pipeline.ts b/src/ai/unified-ai-response-pipeline.ts index b80da70..1e6b015 100644 --- a/src/ai/unified-ai-response-pipeline.ts +++ b/src/ai/unified-ai-response-pipeline.ts @@ -34,6 +34,7 @@ import { } from "./text-to-speech"; import {persistFinalTextArtifactAttachment} from "./final-response-artifact-store"; import {aiLog} from "../logging/ai-logger"; +import {recordPipelineFallback, recordTtsRun} from "../common/ai-observability.js"; function nowIso(): string { return new Date().toISOString(); @@ -41,7 +42,7 @@ function nowIso(): string { function createResponsePipelineState(options: UnifiedRunOptions): UserRequestPipelineState { return { - requestId: `ai-response:${options.msg.chat.id}:${options.msg.message_id}:${Date.now()}`, + requestId: options.requestId ?? `ai-response:${options.msg.chat.id}:${options.msg.message_id}:${Date.now()}`, chatId: options.msg.chat.id, messageId: options.msg.message_id, replyToMessageId: options.msg.reply_to_message?.message_id, @@ -357,6 +358,7 @@ export async function runUnifiedAiResponsePipeline(params: { name: "text_to_speech", async run() { const status = await synthesizeResponseIfRequested({options, config, streamMessage}); + recordTtsRun(status); return { stage: "text_to_speech", status, @@ -396,11 +398,13 @@ export async function runUnifiedAiResponsePipeline(params: { "audit_finish", ], onFallback: async decision => { + recordPipelineFallback(decision.action); if (decision.action === "use_alternate_target") { aiLog("warn", "response.fallback.use_alternate_target", { provider: options.provider, stage: decision.stage, reason: decision.reason, + requestId: state.requestId, ...buildToolRankFallbackTargetDetails(options.provider, config), }); } @@ -410,6 +414,7 @@ export async function runUnifiedAiResponsePipeline(params: { provider: options.provider, stage: decision.stage, reason: decision.reason, + requestId: state.requestId, }); } diff --git a/src/ai/unified-ai-runner.shared.ts b/src/ai/unified-ai-runner.shared.ts index acbe648..4649b6e 100644 --- a/src/ai/unified-ai-runner.shared.ts +++ b/src/ai/unified-ai-runner.shared.ts @@ -78,6 +78,7 @@ function photoGenDir(): string { export type UnifiedRunOptions = { provider: AiProvider; msg: Message; + requestId?: string; isGuestMsg?: boolean; text: string; stream?: boolean; diff --git a/src/ai/unified-ai-runner.ts b/src/ai/unified-ai-runner.ts index f3811e8..a4722c6 100644 --- a/src/ai/unified-ai-runner.ts +++ b/src/ai/unified-ai-runner.ts @@ -35,6 +35,7 @@ import {persistErrorArtifactAttachment} from "./final-response-artifact-store"; import {runUnifiedAiResponsePipeline} from "./unified-ai-response-pipeline"; import {AiRequestStore} from "../common/ai-request-store"; import type {StoredAiRequestStatus} from "../model/stored-ai-request"; +import {recordAiRequestFinish, recordAiRequestStart} from "../common/ai-observability.js"; export type {ToolCallData} from "./unified-ai-runner.shared"; export {snapshotModel, providerTargets, ollamaModelNames} from "./unified-ai-runner.shared"; @@ -49,6 +50,7 @@ async function executeUnifiedAiRequest( const requestStartedAt = Date.now(); let preparedRequest: Awaited> | undefined; aiLog("info", "request.execute.start", { + requestId: options.requestId, provider: providerName(options.provider), stream: options.stream ?? true, think: options.think, @@ -74,6 +76,7 @@ async function executeUnifiedAiRequest( if (preparedRequest.finishAfterTranscript) return; aiLog("debug", "request.messages.collected", { + requestId: options.requestId, provider: providerName(options.provider), chatMessages: preparedRequest.chatMessages.length, imageCount: preparedRequest.imageCount, @@ -91,6 +94,7 @@ async function executeUnifiedAiRequest( controller, }); aiLog("success", "request.execute.done", { + requestId: options.requestId, provider: providerName(options.provider), duration: aiLogDuration(requestStartedAt), responseChars: streamMessage.getText().length, @@ -99,6 +103,7 @@ async function executeUnifiedAiRequest( return; } catch (e) { aiLog("error", "request.execute.failed", { + requestId: options.requestId, provider: providerName(options.provider), duration: aiLogDuration(requestStartedAt), error: e instanceof Error ? e : String(e), @@ -117,6 +122,7 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise { const requestedAttachmentKinds = await collectRequestedAttachmentKinds(options.msg); aiLog("info", "run.start", { + requestId: options.requestId ?? `pending:${options.msg.chat.id}:${options.msg.message_id}`, provider: providerName(options.provider), model: snapshotModel(options.provider, config), message: aiLogMessageIdentity(options.msg), @@ -133,6 +139,7 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise { if (await rejectUnsupportedAttachments(options.provider, snapshotModel(options.provider, config), options.msg, config, requestedAttachmentKinds)) { aiLog("warn", "run.rejected.unsupported_attachment", { + requestId: options.requestId ?? `pending:${options.msg.chat.id}:${options.msg.message_id}`, provider: providerName(options.provider), requestedAttachmentKinds: [...requestedAttachmentKinds], }); @@ -150,6 +157,7 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise { text: Environment.getAttachmentMissingFromCacheText(cached.missing[0].fileName), }).catch(logError); aiLog("warn", "run.rejected.missing_attachment_cache", { + requestId: options.requestId ?? `pending:${options.msg.chat.id}:${options.msg.message_id}`, missing: cached.missing.map(a => ({kind: a.kind, fileName: a.fileName, cachePath: a.cachePath})), }); return; @@ -166,6 +174,8 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise { provider: providerName(options.provider), controller }); + options.requestId ??= cancel.id; + const requestId = options.requestId; const streamMessage = new TelegramStreamMessage( options.msg, cancel.id, @@ -180,10 +190,11 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise { ); cancel.onCancel = () => streamMessage.cancel(cancel.provider); const queueTarget = resolveAiRequestQueueTarget(options, config, requestedAttachmentKinds); - aiLog("debug", "run.queue.target", {target: aiLogProviderTarget(queueTarget), cancelId: cancel.id}); + aiLog("debug", "run.queue.target", {requestId, target: aiLogProviderTarget(queueTarget), cancelId: cancel.id}); const aiRequestStartedAt = new Date().toISOString(); + recordAiRequestStart(); await AiRequestStore.put({ - requestId: cancel.id, + requestId, chatId: options.msg.chat.id, messageId: options.msg.message_id, fromId: options.msg.from?.id ?? 0, @@ -197,7 +208,7 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise { const queueMessage = await streamMessage.start(Environment.waitThinkText); responseMessageId = queueMessage.message_id; await AiRequestStore.put({ - requestId: cancel.id, + requestId, chatId: options.msg.chat.id, messageId: options.msg.message_id, responseMessageId, @@ -207,8 +218,9 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise { status: "running", startedAt: aiRequestStartedAt, }).catch(logError); - setAiCancelMessageId(cancel.id, queueMessage.message_id); + setAiCancelMessageId(requestId, queueMessage.message_id); aiLog("info", "run.queue.enter", { + requestId, cancelId: cancel.id, queueMessageId: queueMessage.message_id, target: aiLogProviderTarget(queueTarget), @@ -217,15 +229,16 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise { await aiProviderRequestQueue.enqueue(queueTarget, { signal: controller.signal, onPositionChange: async requestsBefore => { - aiLog("debug", "run.queue.position", {cancelId: cancel.id, requestsBefore}); + aiLog("debug", "run.queue.position", {requestId, cancelId: cancel.id, requestsBefore}); streamMessage.setStatus(Environment.getAiQueueText(options.provider, requestsBefore)); await streamMessage.flush(); }, run: async (): Promise => { const queueWaitFinishedAt = Date.now(); - aiLog("info", "run.queue.dequeued", {cancelId: cancel.id}); + aiLog("info", "run.queue.dequeued", {requestId, cancelId: cancel.id}); const downloads = attachmentsToDownloadedFiles(cached.attachments); aiLog("debug", "run.downloads.ready", { + requestId, count: downloads.length, downloads: downloads.map(d => ({ kind: d.kind, @@ -239,12 +252,13 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise { await executeUnifiedAiRequest(options, config, downloads, controller, streamMessage); aiRequestStatus = "succeeded"; aiLog("success", "run.queue.task.done", { + requestId, cancelId: cancel.id, duration: aiLogDuration(queueWaitFinishedAt), }); } finally { cleanupDownloads(downloads); - aiLog("debug", "run.downloads.cleaned", {cancelId: cancel.id, count: downloads.length}); + aiLog("debug", "run.downloads.cleaned", {requestId, cancelId: cancel.id, count: downloads.length}); } return null; }, @@ -253,13 +267,13 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise { if (controller.signal.aborted || isAbortError(e instanceof Error ? e : String(e))) { aiRequestStatus = "aborted"; aiRequestError = e instanceof Error ? e.message : String(e); - aiLog("warn", "run.aborted", {cancelId: cancel.id, duration: aiLogDuration(startedAt), error: e instanceof Error ? e : String(e)}); + aiLog("warn", "run.aborted", {requestId, cancelId: cancel.id, duration: aiLogDuration(startedAt), error: e instanceof Error ? e : String(e)}); streamMessage.replaceText(streamMessage.getText()); await streamMessage.finish(); } else { aiRequestStatus = "failed"; aiRequestError = e instanceof Error ? e.message : String(e); - aiLog("error", "run.failed", {cancelId: cancel.id, duration: aiLogDuration(startedAt), error: e instanceof Error ? e : String(e)}); + aiLog("error", "run.failed", {requestId, cancelId: cancel.id, duration: aiLogDuration(startedAt), error: e instanceof Error ? e : String(e)}); const errorMessage = e instanceof Error ? e.message : String(e); await streamMessage.fail(e instanceof Error ? e : String(e)); try { @@ -279,7 +293,7 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise { } finally { clearTimeout(timeout); await AiRequestStore.put({ - requestId: cancel.id, + requestId, chatId: options.msg.chat.id, messageId: options.msg.message_id, responseMessageId, @@ -291,8 +305,10 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise { finishedAt: new Date().toISOString(), error: aiRequestError, }).catch(logError); - finishAiRequest(cancel.id); + recordAiRequestFinish(aiRequestStatus); + finishAiRequest(requestId); aiLog("success", "run.finished", { + requestId, cancelId: cancel.id, provider: providerName(options.provider), duration: aiLogDuration(startedAt), diff --git a/src/commands/ai-audit.ts b/src/commands/ai-audit.ts new file mode 100644 index 0000000..83db90d --- /dev/null +++ b/src/commands/ai-audit.ts @@ -0,0 +1,33 @@ +import {Message} from "typescript-telegram-bot-api"; +import {Command} from "../base/command.js"; +import {Requirements} from "../base/requirements.js"; +import {Requirement} from "../base/requirement.js"; +import {Environment} from "../common/environment.js"; +import {buildAiAuditReport, replyWithTrimmedText, resolveAuditTarget} from "./ai-observability.js"; +import {logError, sendErrorPlaceholder} from "../util/utils.js"; + +export class AIAudit extends Command { + command = ["aiaudit", "audit"]; + argsMode = "optional" as const; + + requirements = Requirements.Build(Requirement.BOT_ADMIN); + + title = Environment.commandTitles.aiAudit; + description = Environment.commandDescriptions.aiAudit; + + async execute(msg: Message, match?: RegExpExecArray | null): Promise { + try { + const target = resolveAuditTarget(msg, match?.[3] ?? null); + if (!target) { + await replyWithTrimmedText(msg, "Usage: reply to a message or pass messageId, or chatId messageId."); + return; + } + + const text = await buildAiAuditReport(target); + await replyWithTrimmedText(msg, text); + } catch (error) { + logError(error instanceof Error ? error : String(error)); + await sendErrorPlaceholder(msg).catch(logError); + } + } +} diff --git a/src/commands/ai-metrics.ts b/src/commands/ai-metrics.ts new file mode 100644 index 0000000..615724e --- /dev/null +++ b/src/commands/ai-metrics.ts @@ -0,0 +1,27 @@ +import {Message} from "typescript-telegram-bot-api"; +import {Command} from "../base/command.js"; +import {Requirements} from "../base/requirements.js"; +import {Requirement} from "../base/requirement.js"; +import {Environment} from "../common/environment.js"; +import {buildAiMetricsReport, replyWithTrimmedText} from "./ai-observability.js"; +import {logError, sendErrorPlaceholder} from "../util/utils.js"; + +export class AIMetrics extends Command { + command = ["aimetrics", "metrics"]; + argsMode = "none" as const; + + requirements = Requirements.Build(Requirement.BOT_ADMIN); + + title = Environment.commandTitles.aiMetrics; + description = Environment.commandDescriptions.aiMetrics; + + async execute(msg: Message): Promise { + try { + const text = await buildAiMetricsReport(); + await replyWithTrimmedText(msg, text); + } catch (error) { + logError(error instanceof Error ? error : String(error)); + await sendErrorPlaceholder(msg).catch(logError); + } + } +} diff --git a/src/commands/ai-observability.ts b/src/commands/ai-observability.ts new file mode 100644 index 0000000..fd8ce73 --- /dev/null +++ b/src/commands/ai-observability.ts @@ -0,0 +1,155 @@ +import {Message} from "typescript-telegram-bot-api"; +import {DatabaseManager} from "../db/database-manager.js"; +import type {AttachmentDbRow} from "../db/db-types.js"; +import {replyToMessage} from "../util/utils.js"; +import {snapshotAiObservability} from "../common/ai-observability.js"; + +export type AuditTarget = { + chatId: number; + messageId: number; +}; + +export function resolveAuditTarget(msg: Message, argsText?: string | null): AuditTarget | null { + if (msg.reply_to_message) { + return { + chatId: msg.chat.id, + messageId: msg.reply_to_message.message_id, + }; + } + + const args = argsText?.trim().split(/\s+/).filter(Boolean) ?? []; + if (!args.length) return null; + + if (args.length === 1) { + const messageId = Number(args[0]); + if (!Number.isFinite(messageId)) return null; + return { + chatId: msg.chat.id, + messageId, + }; + } + + const chatId = Number(args[0]); + const messageId = Number(args[1]); + if (!Number.isFinite(chatId) || !Number.isFinite(messageId)) return null; + + return {chatId, messageId}; +} + +function formatSize(bytes: number | null | undefined): string { + if (!Number.isFinite(bytes ?? NaN)) return "n/a"; + const value = Number(bytes); + if (value >= 1024 * 1024) return `${(value / (1024 * 1024)).toFixed(1)} MB`; + if (value >= 1024) return `${(value / 1024).toFixed(1)} KB`; + return `${value} B`; +} + +function clip(value: string | null | undefined, max = 120): string { + const text = (value ?? "").trim(); + if (!text) return "n/a"; + return text.length <= max ? text : `${text.slice(0, max)}…`; +} + +function formatAttachmentLine(index: number, attachment: AttachmentDbRow): string { + return [ + `${index + 1}.`, + attachment.direction, + attachment.kind, + attachment.fileName, + `size=${formatSize(attachment.sizeBytes)}`, + attachment.artifactKind ? `artifact=${attachment.artifactKind}` : null, + ].filter(Boolean).join(" "); +} + +export async function buildAiAuditReport(target: AuditTarget): Promise { + const [request, audits, artifacts, attachments] = await Promise.all([ + DatabaseManager.getAiRequestByMessage(target.chatId, target.messageId), + DatabaseManager.getRequestAuditsByMessage(target.chatId, target.messageId), + DatabaseManager.getArtifactsByMessage(target.chatId, target.messageId), + DatabaseManager.getAttachmentsByMessage(target.chatId, target.messageId), + ]); + + const lines: string[] = [ + "AI observability audit", + `chatId: ${target.chatId}`, + `messageId: ${target.messageId}`, + "", + "AI request:", + ]; + + if (request) { + lines.push( + ` requestId: ${request.requestId}`, + ` provider: ${request.provider}`, + ` model: ${request.model}`, + ` status: ${request.status}`, + ` startedAt: ${request.startedAt}`, + ` finishedAt: ${request.finishedAt ?? "n/a"}`, + ` error: ${clip(request.error, 240)}`, + ); + } else { + lines.push(" not found"); + } + + lines.push("", `Pipeline audits: ${audits.length}`); + audits.slice(0, 12).forEach((audit, index) => { + lines.push( + ` ${index + 1}. ${audit.stage} ${audit.status}` + + `${audit.durationMs !== null ? ` ${audit.durationMs}ms` : ""}` + + `${audit.provider ? ` provider=${audit.provider}` : ""}` + + `${audit.model ? ` model=${audit.model}` : ""}` + + `${audit.error ? ` error=${clip(audit.error, 120)}` : ""}`, + ); + }); + if (audits.length > 12) { + lines.push(` … and ${audits.length - 12} more`); + } + + lines.push("", `Artifacts: ${artifacts.length}`); + artifacts.slice(0, 12).forEach((artifact, index) => { + lines.push( + ` ${index + 1}. ${artifact.kind} stage=${artifact.stage}` + + `${artifact.attachmentId ? ` attachmentId=${artifact.attachmentId}` : ""}` + + `${artifact.createdAt ? ` createdAt=${artifact.createdAt}` : ""}`, + ); + }); + if (artifacts.length > 12) { + lines.push(` … and ${artifacts.length - 12} more`); + } + + lines.push("", `Attachments: ${attachments.length}`); + attachments.slice(0, 12).forEach((attachment, index) => { + lines.push(` ${formatAttachmentLine(index, attachment)}`); + }); + if (attachments.length > 12) { + lines.push(` … and ${attachments.length - 12} more`); + } + + return lines.join("\n"); +} + +export async function buildAiMetricsReport(): Promise { + const snapshot = snapshotAiObservability(); + const [aiRequests, attachments, artifacts, requestAudits] = await Promise.all([ + DatabaseManager.getAllAiRequests(), + DatabaseManager.getAllAttachments(), + DatabaseManager.getAllArtifacts(), + DatabaseManager.getAllRequestAudits(), + ]); + + return [ + "AI observability metrics", + `requests: total=${snapshot.requests.total} succeeded=${snapshot.requests.succeeded} failed=${snapshot.requests.failed} aborted=${snapshot.requests.aborted}`, + `fallbacks: total=${snapshot.fallbacks.total} ignore=${snapshot.fallbacks.ignore} notify_user=${snapshot.fallbacks.notifyUser} continue_without_stage=${snapshot.fallbacks.continueWithoutStage} use_alternate_target=${snapshot.fallbacks.useAlternateTarget} fail_request=${snapshot.fallbacks.failRequest}`, + `tool calls: ${snapshot.toolCalls}`, + `RAG runs: ${snapshot.ragRuns}`, + `TTS runs: total=${snapshot.ttsRuns.total} succeeded=${snapshot.ttsRuns.succeeded} failed=${snapshot.ttsRuns.failed} skipped=${snapshot.ttsRuns.skipped}`, + `db rows: ai_requests=${aiRequests.length} attachments=${attachments.length} artifacts=${artifacts.length} request_audit=${requestAudits.length}`, + ].join("\n"); +} + +export async function replyWithTrimmedText(msg: Message, text: string): Promise { + const maxLength = 3800; + const nextText = text.length <= maxLength ? text : `${text.slice(0, maxLength)}\n… (trimmed)`; + await replyToMessage({message: msg, text: nextText}); +} diff --git a/src/commands/ai-requests.ts b/src/commands/ai-requests.ts new file mode 100644 index 0000000..99df30e --- /dev/null +++ b/src/commands/ai-requests.ts @@ -0,0 +1,51 @@ +import {Message} from "typescript-telegram-bot-api"; +import {Command} from "../base/command.js"; +import {Requirements} from "../base/requirements.js"; +import {Requirement} from "../base/requirement.js"; +import {Environment} from "../common/environment.js"; +import {DatabaseManager} from "../db/database-manager.js"; +import {logError, sendErrorPlaceholder} from "../util/utils.js"; +import {replyWithTrimmedText} from "./ai-observability.js"; + +function formatRequestLine(index: number, request: Awaited>[number]): string { + return [ + `${index + 1}.`, + `requestId=${request.requestId}`, + `chatId=${request.chatId}`, + `messageId=${request.messageId}`, + request.responseMessageId ? `responseMessageId=${request.responseMessageId}` : null, + `provider=${request.provider}`, + `model=${request.model}`, + `status=${request.status}`, + `startedAt=${request.startedAt}`, + request.finishedAt ? `finishedAt=${request.finishedAt}` : null, + request.error ? `error=${request.error}` : null, + ].filter(Boolean).join(" "); +} + +export class AIRequests extends Command { + command = ["airequests"]; + argsMode = "none" as const; + + requirements = Requirements.Build(Requirement.BOT_ADMIN); + + title = Environment.commandTitles.aiRequests; + description = Environment.commandDescriptions.aiRequests; + + async execute(msg: Message): Promise { + try { + const requests = (await DatabaseManager.getAllAiRequests()).slice(-10).reverse(); + const lines = [ + "Recent AI requests", + `count: ${requests.length}`, + "", + ...requests.map((request, index) => formatRequestLine(index, request)), + ]; + + await replyWithTrimmedText(msg, lines.join("\n")); + } catch (error) { + logError(error instanceof Error ? error : String(error)); + await sendErrorPlaceholder(msg).catch(logError); + } + } +} diff --git a/src/common/ai-observability.ts b/src/common/ai-observability.ts new file mode 100644 index 0000000..79b52a5 --- /dev/null +++ b/src/common/ai-observability.ts @@ -0,0 +1,123 @@ +import type {PipelineFallbackAction} from "../ai/user-request-pipeline"; +import type {StoredAiRequestStatus} from "../model/stored-ai-request.js"; + +type CounterSnapshot = { + total: number; + succeeded: number; + failed: number; + aborted: number; +}; + +export type AiObservabilitySnapshot = { + requests: CounterSnapshot; + fallbacks: { + total: number; + ignore: number; + notifyUser: number; + continueWithoutStage: number; + useAlternateTarget: number; + failRequest: number; + }; + toolCalls: number; + ragRuns: number; + ttsRuns: { + total: number; + succeeded: number; + failed: number; + skipped: number; + }; +}; + +const requestCounters = { + total: 0, + succeeded: 0, + failed: 0, + aborted: 0, +}; + +const fallbackCounters = { + total: 0, + ignore: 0, + notifyUser: 0, + continueWithoutStage: 0, + useAlternateTarget: 0, + failRequest: 0, +}; + +const ttsCounters = { + total: 0, + succeeded: 0, + failed: 0, + skipped: 0, +}; + +let toolCalls = 0; +let ragRuns = 0; + +function incrementFallback(action: PipelineFallbackAction): void { + fallbackCounters.total += 1; + switch (action) { + case "ignore": + fallbackCounters.ignore += 1; + break; + case "notify_user": + fallbackCounters.notifyUser += 1; + break; + case "continue_without_stage": + fallbackCounters.continueWithoutStage += 1; + break; + case "use_alternate_target": + fallbackCounters.useAlternateTarget += 1; + break; + case "fail_request": + fallbackCounters.failRequest += 1; + break; + } +} + +export function recordAiRequestStart(): void { + requestCounters.total += 1; +} + +export function recordAiRequestFinish(status: StoredAiRequestStatus): void { + switch (status) { + case "succeeded": + requestCounters.succeeded += 1; + break; + case "failed": + requestCounters.failed += 1; + break; + case "aborted": + requestCounters.aborted += 1; + break; + case "running": + break; + } +} + +export function recordPipelineFallback(action: PipelineFallbackAction): void { + incrementFallback(action); +} + +export function recordToolCall(): void { + toolCalls += 1; +} + +export function recordRagRun(): void { + ragRuns += 1; +} + +export function recordTtsRun(status: "succeeded" | "failed" | "skipped"): void { + ttsCounters.total += 1; + ttsCounters[status] += 1; +} + +export function snapshotAiObservability(): AiObservabilitySnapshot { + return { + requests: {...requestCounters}, + fallbacks: {...fallbackCounters}, + toolCalls, + ragRuns, + ttsRuns: {...ttsCounters}, + }; +} diff --git a/src/common/environment.ts b/src/common/environment.ts index e4429ba..f19e4f6 100644 --- a/src/common/environment.ts +++ b/src/common/environment.ts @@ -991,6 +991,9 @@ export class Environment { choice: "/choice a, b, ..., c", coin: "/coin", debug: "/debug", + aiRequests: "/aiRequests", + aiAudit: "/aiAudit [reply|messageId|chatId messageId]", + aiMetrics: "/aiMetrics", dice: "/dice", distort: "/distort [amp] [wavelength]", help: "/help", @@ -1041,6 +1044,9 @@ export class Environment { choice: this.text("commandDescriptions.choice", "Choose a random value"), coin: this.text("commandDescriptions.coin", "Heads or tails"), debug: this.text("commandDescriptions.debug", "Returns msg (or reply) as json"), + aiRequests: this.text("commandDescriptions.aiRequests", "Show recent AI requests"), + aiAudit: this.text("commandDescriptions.aiAudit", "Inspect AI request audit and artifacts"), + aiMetrics: this.text("commandDescriptions.aiMetrics", "Show AI observability counters"), dice: this.text("commandDescriptions.dice", "Sends random or specific dice"), distort: this.text("commandDescriptions.distort", "Distortion of picture"), help: this.text("commandDescriptions.help", "Show list of commands"), diff --git a/src/index.ts b/src/index.ts index 91145a5..8795588 100644 --- a/src/index.ts +++ b/src/index.ts @@ -75,6 +75,9 @@ import {UserSettingsCallback} from "./callback_commands/user-settings.js"; import {TextToSpeech} from "./commands/text-to-speech.js"; import {SpeechToText} from "./commands/speech-to-text.js"; import {cleanupInternalArtifactCache} from "./ai/internal-artifact-store.js"; +import {AIAudit} from "./commands/ai-audit.js"; +import {AIMetrics} from "./commands/ai-metrics.js"; +import {AIRequests} from "./commands/ai-requests.js"; process.setUncaughtExceptionCaptureCallback(logError); @@ -119,6 +122,9 @@ export const commands: Command[] = [ new Settings(), new TextToSpeech(), new SpeechToText(), + new AIRequests(), + new AIAudit(), + new AIMetrics(), new AdminsAdd(), new AdminsRemove(), @@ -272,6 +278,21 @@ async function main() { }, () => ({notesRootFilePath})); await measureStartupStep("cleanup_internal_artifacts", () => cleanupInternalArtifactCache(), () => ({retentionDays: 14})); + await measureStartupStep("observability.snapshot", async () => { + const [aiRequests, attachments, artifacts, requestAudits] = await Promise.all([ + DatabaseManager.getAllAiRequests(), + DatabaseManager.getAllAttachments(), + DatabaseManager.getAllArtifacts(), + DatabaseManager.getAllRequestAudits(), + ]); + + return { + aiRequests: aiRequests.length, + attachments: attachments.length, + artifacts: artifacts.length, + requestAudits: requestAudits.length, + }; + }, () => ({tables: ["ai_requests", "attachments", "artifacts", "request_audit"]})); const cmds = await measureStartupStep("build_commands", () => commands.filter(cmd => { return cmd.title && cmd.title.startsWith("/") && cmd.title.split(" ").length === 1 && cmd.description; diff --git a/test/ai-observability.test.mjs b/test/ai-observability.test.mjs new file mode 100644 index 0000000..966bfdb --- /dev/null +++ b/test/ai-observability.test.mjs @@ -0,0 +1,24 @@ +import test from "node:test"; +import assert from "node:assert/strict"; + +const observability = await import("../dist/common/ai-observability.js"); + +test("ai observability snapshot counts recorded events", () => { + const before = observability.snapshotAiObservability(); + + observability.recordAiRequestStart(); + observability.recordAiRequestFinish("succeeded"); + observability.recordPipelineFallback("notify_user"); + observability.recordToolCall(); + observability.recordRagRun(); + observability.recordTtsRun("skipped"); + + const after = observability.snapshotAiObservability(); + + assert.equal(after.requests.total, before.requests.total + 1); + assert.equal(after.requests.succeeded, before.requests.succeeded + 1); + assert.equal(after.fallbacks.notifyUser, before.fallbacks.notifyUser + 1); + assert.equal(after.toolCalls, before.toolCalls + 1); + assert.equal(after.ragRuns, before.ragRuns + 1); + assert.equal(after.ttsRuns.skipped, before.ttsRuns.skipped + 1); +});