Add AI observability commands and metrics

This commit is contained in:
2026-05-18 20:58:19 +03:00
parent 53e9798193
commit 75253534d8
17 changed files with 516 additions and 20 deletions
+17
View File
@@ -5,6 +5,7 @@ import {Environment} from "../common/environment";
import {MessageStore} from "../common/message-store";
import {createQueuedFunction} from "../util/async-lock";
import {enqueueTelegramApiCall} from "../util/telegram-api-queue";
import {appLogger} from "../logging/logger";
import fs from "node:fs";
import path from "node:path";
import {StoredAttachment, StoredAttachmentKind} from "../model/stored-attachment";
@@ -13,11 +14,13 @@ import {prepareTelegramMarkdownV2} from "../util/markdown-v2-renderer";
import {AiProvider} from "../model/ai-provider";
import {AI_IMAGE_OUTPUT_MODE_DOCUMENT, UserAiImageOutputMode} from "../common/user-ai-settings";
import {PIPELINE_ATTACHMENT_LIMIT_BYTES} from "./user-request-pipeline";
import {recordToolCall} from "../common/ai-observability.js";
const TELEGRAM_LIMIT = 4096;
const TELEGRAM_CAPTION_LIMIT = 1024;
const TELEGRAM_PHOTO_LIMIT_BYTES = 10 * 1024 * 1024;
const EDIT_INTERVAL_MS = 4500;
const logger = appLogger.child("telegram-stream-message");
export type TelegramArtifactFile = {
kind: "image" | "file";
@@ -238,6 +241,13 @@ export class TelegramStreamMessage {
recordToolExecution(record: TelegramToolExecutionRecord): void {
this.toolExecutions.push(record);
recordToolCall();
logger.debug("tool.execution.recorded", {
requestId: this.cancelRequestId,
toolName: record.toolName,
callId: record.callId,
resultChars: record.resultChars,
});
}
getToolExecutions(): TelegramToolExecutionRecord[] {
@@ -246,6 +256,13 @@ export class TelegramStreamMessage {
recordOutputAttachment(record: TelegramOutputAttachmentRecord): void {
this.outputAttachments.push(record);
logger.debug("output_attachment.recorded", {
requestId: this.cancelRequestId,
artifactKind: record.artifactKind,
fileName: record.fileName,
sizeBytes: record.sizeBytes,
messageId: record.messageId,
});
}
getOutputAttachments(): TelegramOutputAttachmentRecord[] {
+9 -1
View File
@@ -15,6 +15,7 @@ import {prepareDocumentRag} from "./document-rag-pipeline";
import {persistRagArtifactAttachment} from "./rag-artifact-store";
import {persistTranscriptArtifactAttachment} from "./transcript-artifact-store";
import type {ToolRuntimeContext} from "./tools/runtime";
import {recordPipelineFallback, recordRagRun} from "../common/ai-observability.js";
import {
appendTranscriptToChatMessages,
collectTextMessages,
@@ -64,7 +65,7 @@ function runtimeTargetFor(options: UnifiedRunOptions, config: RuntimeConfigSnaps
function createAiRequestPipelineState(options: UnifiedRunOptions): UserRequestPipelineState {
return {
requestId: `ai:${options.msg.chat.id}:${options.msg.message_id}:${Date.now()}`,
requestId: options.requestId ?? `ai:${options.msg.chat.id}:${options.msg.message_id}:${Date.now()}`,
chatId: options.msg.chat.id,
messageId: options.msg.message_id,
replyToMessageId: options.msg.reply_to_message?.message_id,
@@ -274,6 +275,10 @@ export async function prepareUnifiedAiRequestPipeline(params: {
await streamMessage.storeInternalAttachment(ragArtifact);
}
if (prepared.preparedDocumentRag) {
recordRagRun();
}
return {
stage: "document_rag",
status: prepared.preparedDocumentRag ? "succeeded" : "skipped",
@@ -313,11 +318,13 @@ export async function prepareUnifiedAiRequestPipeline(params: {
"audit_finish",
],
onFallback: async decision => {
recordPipelineFallback(decision.action);
if (decision.action === "use_alternate_target") {
aiLog("warn", "request.fallback.use_alternate_target", {
provider: options.provider,
stage: decision.stage,
reason: decision.reason,
requestId: state.requestId,
...buildToolRankFallbackTargetDetails(options.provider, config),
});
}
@@ -327,6 +334,7 @@ export async function prepareUnifiedAiRequestPipeline(params: {
provider: options.provider,
stage: decision.stage,
reason: decision.reason,
requestId: state.requestId,
});
}
+6 -1
View File
@@ -34,6 +34,7 @@ import {
} from "./text-to-speech";
import {persistFinalTextArtifactAttachment} from "./final-response-artifact-store";
import {aiLog} from "../logging/ai-logger";
import {recordPipelineFallback, recordTtsRun} from "../common/ai-observability.js";
function nowIso(): string {
return new Date().toISOString();
@@ -41,7 +42,7 @@ function nowIso(): string {
function createResponsePipelineState(options: UnifiedRunOptions): UserRequestPipelineState {
return {
requestId: `ai-response:${options.msg.chat.id}:${options.msg.message_id}:${Date.now()}`,
requestId: options.requestId ?? `ai-response:${options.msg.chat.id}:${options.msg.message_id}:${Date.now()}`,
chatId: options.msg.chat.id,
messageId: options.msg.message_id,
replyToMessageId: options.msg.reply_to_message?.message_id,
@@ -357,6 +358,7 @@ export async function runUnifiedAiResponsePipeline(params: {
name: "text_to_speech",
async run() {
const status = await synthesizeResponseIfRequested({options, config, streamMessage});
recordTtsRun(status);
return {
stage: "text_to_speech",
status,
@@ -396,11 +398,13 @@ export async function runUnifiedAiResponsePipeline(params: {
"audit_finish",
],
onFallback: async decision => {
recordPipelineFallback(decision.action);
if (decision.action === "use_alternate_target") {
aiLog("warn", "response.fallback.use_alternate_target", {
provider: options.provider,
stage: decision.stage,
reason: decision.reason,
requestId: state.requestId,
...buildToolRankFallbackTargetDetails(options.provider, config),
});
}
@@ -410,6 +414,7 @@ export async function runUnifiedAiResponsePipeline(params: {
provider: options.provider,
stage: decision.stage,
reason: decision.reason,
requestId: state.requestId,
});
}
+1
View File
@@ -78,6 +78,7 @@ function photoGenDir(): string {
export type UnifiedRunOptions = {
provider: AiProvider;
msg: Message;
requestId?: string;
isGuestMsg?: boolean;
text: string;
stream?: boolean;
+27 -11
View File
@@ -35,6 +35,7 @@ import {persistErrorArtifactAttachment} from "./final-response-artifact-store";
import {runUnifiedAiResponsePipeline} from "./unified-ai-response-pipeline";
import {AiRequestStore} from "../common/ai-request-store";
import type {StoredAiRequestStatus} from "../model/stored-ai-request";
import {recordAiRequestFinish, recordAiRequestStart} from "../common/ai-observability.js";
export type {ToolCallData} from "./unified-ai-runner.shared";
export {snapshotModel, providerTargets, ollamaModelNames} from "./unified-ai-runner.shared";
@@ -49,6 +50,7 @@ async function executeUnifiedAiRequest(
const requestStartedAt = Date.now();
let preparedRequest: Awaited<ReturnType<typeof prepareUnifiedAiRequestPipeline>> | undefined;
aiLog("info", "request.execute.start", {
requestId: options.requestId,
provider: providerName(options.provider),
stream: options.stream ?? true,
think: options.think,
@@ -74,6 +76,7 @@ async function executeUnifiedAiRequest(
if (preparedRequest.finishAfterTranscript) return;
aiLog("debug", "request.messages.collected", {
requestId: options.requestId,
provider: providerName(options.provider),
chatMessages: preparedRequest.chatMessages.length,
imageCount: preparedRequest.imageCount,
@@ -91,6 +94,7 @@ async function executeUnifiedAiRequest(
controller,
});
aiLog("success", "request.execute.done", {
requestId: options.requestId,
provider: providerName(options.provider),
duration: aiLogDuration(requestStartedAt),
responseChars: streamMessage.getText().length,
@@ -99,6 +103,7 @@ async function executeUnifiedAiRequest(
return;
} catch (e) {
aiLog("error", "request.execute.failed", {
requestId: options.requestId,
provider: providerName(options.provider),
duration: aiLogDuration(requestStartedAt),
error: e instanceof Error ? e : String(e),
@@ -117,6 +122,7 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise<void> {
const requestedAttachmentKinds = await collectRequestedAttachmentKinds(options.msg);
aiLog("info", "run.start", {
requestId: options.requestId ?? `pending:${options.msg.chat.id}:${options.msg.message_id}`,
provider: providerName(options.provider),
model: snapshotModel(options.provider, config),
message: aiLogMessageIdentity(options.msg),
@@ -133,6 +139,7 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise<void> {
if (await rejectUnsupportedAttachments(options.provider, snapshotModel(options.provider, config), options.msg, config, requestedAttachmentKinds)) {
aiLog("warn", "run.rejected.unsupported_attachment", {
requestId: options.requestId ?? `pending:${options.msg.chat.id}:${options.msg.message_id}`,
provider: providerName(options.provider),
requestedAttachmentKinds: [...requestedAttachmentKinds],
});
@@ -150,6 +157,7 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise<void> {
text: Environment.getAttachmentMissingFromCacheText(cached.missing[0].fileName),
}).catch(logError);
aiLog("warn", "run.rejected.missing_attachment_cache", {
requestId: options.requestId ?? `pending:${options.msg.chat.id}:${options.msg.message_id}`,
missing: cached.missing.map(a => ({kind: a.kind, fileName: a.fileName, cachePath: a.cachePath})),
});
return;
@@ -166,6 +174,8 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise<void> {
provider: providerName(options.provider),
controller
});
options.requestId ??= cancel.id;
const requestId = options.requestId;
const streamMessage = new TelegramStreamMessage(
options.msg,
cancel.id,
@@ -180,10 +190,11 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise<void> {
);
cancel.onCancel = () => streamMessage.cancel(cancel.provider);
const queueTarget = resolveAiRequestQueueTarget(options, config, requestedAttachmentKinds);
aiLog("debug", "run.queue.target", {target: aiLogProviderTarget(queueTarget), cancelId: cancel.id});
aiLog("debug", "run.queue.target", {requestId, target: aiLogProviderTarget(queueTarget), cancelId: cancel.id});
const aiRequestStartedAt = new Date().toISOString();
recordAiRequestStart();
await AiRequestStore.put({
requestId: cancel.id,
requestId,
chatId: options.msg.chat.id,
messageId: options.msg.message_id,
fromId: options.msg.from?.id ?? 0,
@@ -197,7 +208,7 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise<void> {
const queueMessage = await streamMessage.start(Environment.waitThinkText);
responseMessageId = queueMessage.message_id;
await AiRequestStore.put({
requestId: cancel.id,
requestId,
chatId: options.msg.chat.id,
messageId: options.msg.message_id,
responseMessageId,
@@ -207,8 +218,9 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise<void> {
status: "running",
startedAt: aiRequestStartedAt,
}).catch(logError);
setAiCancelMessageId(cancel.id, queueMessage.message_id);
setAiCancelMessageId(requestId, queueMessage.message_id);
aiLog("info", "run.queue.enter", {
requestId,
cancelId: cancel.id,
queueMessageId: queueMessage.message_id,
target: aiLogProviderTarget(queueTarget),
@@ -217,15 +229,16 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise<void> {
await aiProviderRequestQueue.enqueue(queueTarget, {
signal: controller.signal,
onPositionChange: async requestsBefore => {
aiLog("debug", "run.queue.position", {cancelId: cancel.id, requestsBefore});
aiLog("debug", "run.queue.position", {requestId, cancelId: cancel.id, requestsBefore});
streamMessage.setStatus(Environment.getAiQueueText(options.provider, requestsBefore));
await streamMessage.flush();
},
run: async (): Promise<null> => {
const queueWaitFinishedAt = Date.now();
aiLog("info", "run.queue.dequeued", {cancelId: cancel.id});
aiLog("info", "run.queue.dequeued", {requestId, cancelId: cancel.id});
const downloads = attachmentsToDownloadedFiles(cached.attachments);
aiLog("debug", "run.downloads.ready", {
requestId,
count: downloads.length,
downloads: downloads.map(d => ({
kind: d.kind,
@@ -239,12 +252,13 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise<void> {
await executeUnifiedAiRequest(options, config, downloads, controller, streamMessage);
aiRequestStatus = "succeeded";
aiLog("success", "run.queue.task.done", {
requestId,
cancelId: cancel.id,
duration: aiLogDuration(queueWaitFinishedAt),
});
} finally {
cleanupDownloads(downloads);
aiLog("debug", "run.downloads.cleaned", {cancelId: cancel.id, count: downloads.length});
aiLog("debug", "run.downloads.cleaned", {requestId, cancelId: cancel.id, count: downloads.length});
}
return null;
},
@@ -253,13 +267,13 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise<void> {
if (controller.signal.aborted || isAbortError(e instanceof Error ? e : String(e))) {
aiRequestStatus = "aborted";
aiRequestError = e instanceof Error ? e.message : String(e);
aiLog("warn", "run.aborted", {cancelId: cancel.id, duration: aiLogDuration(startedAt), error: e instanceof Error ? e : String(e)});
aiLog("warn", "run.aborted", {requestId, cancelId: cancel.id, duration: aiLogDuration(startedAt), error: e instanceof Error ? e : String(e)});
streamMessage.replaceText(streamMessage.getText());
await streamMessage.finish();
} else {
aiRequestStatus = "failed";
aiRequestError = e instanceof Error ? e.message : String(e);
aiLog("error", "run.failed", {cancelId: cancel.id, duration: aiLogDuration(startedAt), error: e instanceof Error ? e : String(e)});
aiLog("error", "run.failed", {requestId, cancelId: cancel.id, duration: aiLogDuration(startedAt), error: e instanceof Error ? e : String(e)});
const errorMessage = e instanceof Error ? e.message : String(e);
await streamMessage.fail(e instanceof Error ? e : String(e));
try {
@@ -279,7 +293,7 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise<void> {
} finally {
clearTimeout(timeout);
await AiRequestStore.put({
requestId: cancel.id,
requestId,
chatId: options.msg.chat.id,
messageId: options.msg.message_id,
responseMessageId,
@@ -291,8 +305,10 @@ export async function runUnifiedAi(options: UnifiedRunOptions): Promise<void> {
finishedAt: new Date().toISOString(),
error: aiRequestError,
}).catch(logError);
finishAiRequest(cancel.id);
recordAiRequestFinish(aiRequestStatus);
finishAiRequest(requestId);
aiLog("success", "run.finished", {
requestId,
cancelId: cancel.id,
provider: providerName(options.provider),
duration: aiLogDuration(startedAt),