564 lines
23 KiB
TypeScript
564 lines
23 KiB
TypeScript
// OpenAI and OpenAI-compatible provider runners extracted from unified-ai-runner.ts.
|
|
import {Message} from "typescript-telegram-bot-api";
|
|
import {Environment} from "../common/environment";
|
|
import {getOpenAITools} from "./tool-mappers";
|
|
import {TelegramStreamMessage} from "./telegram-stream-message";
|
|
import {ToolRuntimeContext} from "./tools/runtime";
|
|
import {OpenAIChatMessage} from "./openai-chat-message";
|
|
import type {
|
|
ResponseCreateParamsNonStreaming,
|
|
ResponseCreateParamsStreaming,
|
|
ResponseInputItem,
|
|
ResponseStreamEvent
|
|
} from "openai/resources/responses/responses";
|
|
import type {
|
|
ChatCompletionCreateParamsNonStreaming,
|
|
ChatCompletionCreateParamsStreaming
|
|
} from "openai/resources/chat/completions";
|
|
import {createGeminiOpenAiClient, createOpenAiClient} from "./ai-runtime-target";
|
|
import {aiLog, aiLogDuration, aiLogMessageIdentity, aiLogProviderTarget, aiLogToolCall} from "../logging/ai-logger";
|
|
|
|
import {
|
|
AsyncIterableStream,
|
|
collectOpenAiResponseFunctionCalls,
|
|
collectOpenAiResponseImages,
|
|
collectOpenAiResponseText,
|
|
executeToolBatch,
|
|
getOpenAIResponsesToolsWithImage,
|
|
isRecord,
|
|
MAX_TOOL_ROUNDS,
|
|
OPENAI_IMAGE_PARTIALS,
|
|
OpenAiChatCompletionResponseLike,
|
|
OpenAiChatCompletionStreamChunkLike,
|
|
OpenAiChatToolCallLike,
|
|
OpenAiCompatibleChatMessage,
|
|
OpenAiCompatibleContentPart,
|
|
openAiResponseItemCallId,
|
|
OpenAiResponseLike,
|
|
OpenAiResponseOutputItem,
|
|
roundStatus,
|
|
RuntimeConfigSnapshot,
|
|
safeJsonParseObject,
|
|
showOpenAiGeneratedImage,
|
|
StreamingToolCallAccumulator,
|
|
ToolCallData,
|
|
ToolExecutionMemory
|
|
} from "./unified-ai-runner.shared";
|
|
import {GetNoteFileResult, GetNoteFileResultSchema} from "./tools/send-note-file";
|
|
import {bot, notesDir} from "../index";
|
|
import fs from "node:fs";
|
|
import path from "node:path";
|
|
import {logError} from "../util/utils";
|
|
|
|
export async function runOpenAi(
|
|
msg: Message,
|
|
messages: OpenAIChatMessage[],
|
|
streamMessage: TelegramStreamMessage,
|
|
signal: AbortSignal,
|
|
stream: boolean,
|
|
firstRoundStatus: string,
|
|
sourceMessage: Message,
|
|
config: RuntimeConfigSnapshot,
|
|
toolContext: ToolRuntimeContext,
|
|
): Promise<void> {
|
|
// TODO: 13.05.2026: remove
|
|
firstRoundStatus;
|
|
const runnerStartedAt = Date.now();
|
|
let responseInput: unknown[] = [...messages];
|
|
const openAi = createOpenAiClient(config.openAiChatTarget);
|
|
|
|
aiLog("info", "openai.run.start", {
|
|
stream,
|
|
target: aiLogProviderTarget(config.openAiChatTarget),
|
|
imageTarget: aiLogProviderTarget(config.openAiImageTarget),
|
|
inputMessages: messages.length,
|
|
sourceMessage: aiLogMessageIdentity(sourceMessage),
|
|
hasToolInputFiles: !!toolContext.pythonInputFiles?.length,
|
|
});
|
|
|
|
const toolMemory: ToolExecutionMemory = new Map();
|
|
|
|
for (let round = 0; round < MAX_TOOL_ROUNDS; round++) {
|
|
const roundStartedAt = Date.now();
|
|
aiLog("debug", "openai.round.start", {round, inputItems: responseInput.length, stream});
|
|
|
|
if (!stream) {
|
|
const request: ResponseCreateParamsNonStreaming = {
|
|
model: config.openAiChatTarget.model,
|
|
input: responseInput as ResponseInputItem[],
|
|
tools: getOpenAIResponsesToolsWithImage(config) as ResponseCreateParamsNonStreaming["tools"],
|
|
instructions: config.systemPrompt,
|
|
};
|
|
const response = await openAi.responses.create(request, {signal}) as unknown as OpenAiResponseLike;
|
|
|
|
const responseText = collectOpenAiResponseText(response);
|
|
streamMessage.append(responseText);
|
|
aiLog("debug", "openai.response.received", {
|
|
round,
|
|
duration: aiLogDuration(roundStartedAt),
|
|
textChars: responseText.length,
|
|
outputItems: response?.output?.length ?? 0,
|
|
});
|
|
const images = collectOpenAiResponseImages(response);
|
|
if (images.length) {
|
|
await showOpenAiGeneratedImage(
|
|
streamMessage,
|
|
sourceMessage,
|
|
images[images.length - 1],
|
|
`final_${round}`,
|
|
Environment.getImageGenDoneText(config.openAiImageTarget.model),
|
|
true,
|
|
);
|
|
}
|
|
|
|
const calls = collectOpenAiResponseFunctionCalls(response);
|
|
aiLog(calls.length ? "info" : "success", calls.length ? "openai.tool_calls" : "openai.run.done", {
|
|
round,
|
|
duration: calls.length ? aiLogDuration(roundStartedAt) : aiLogDuration(runnerStartedAt),
|
|
calls: calls.map(call => ({
|
|
id: call.callId,
|
|
name: call.name,
|
|
arguments: safeJsonParseObject(call.argumentsText)
|
|
})),
|
|
});
|
|
if (!calls.length) return;
|
|
|
|
const toolCalls = calls.map(call => ({
|
|
id: call.callId,
|
|
name: call.name,
|
|
argumentsText: call.argumentsText,
|
|
}));
|
|
const toolResults = await executeToolBatch(toolCalls, streamMessage, toolContext, toolMemory);
|
|
|
|
let successGetNoteFileResult: GetNoteFileResult | undefined = undefined;
|
|
|
|
for (const toolResult of toolResults) {
|
|
try {
|
|
const raw = JSON.parse(toolResult);
|
|
const res = GetNoteFileResultSchema.safeParse(raw);
|
|
|
|
if (res.success && res.data.success) {
|
|
successGetNoteFileResult = res.data;
|
|
}
|
|
} catch {
|
|
// Not every tool result is JSON.
|
|
}
|
|
}
|
|
|
|
if (successGetNoteFileResult && "attachment" in successGetNoteFileResult) {
|
|
await bot.sendDocument({
|
|
chat_id: msg.chat.id,
|
|
reply_parameters: {
|
|
message_id: msg.message_id,
|
|
},
|
|
document: fs.createReadStream(path.join(notesDir, successGetNoteFileResult.attachment.relativePath)),
|
|
}).catch(logError);
|
|
}
|
|
|
|
const toolOutputs = calls.map((call, index) => ({
|
|
type: "function_call_output" as const,
|
|
call_id: call.callId,
|
|
output: toolResults[index] ?? "",
|
|
}));
|
|
responseInput = [...responseInput, ...(response.output ?? []), ...toolOutputs];
|
|
continue;
|
|
}
|
|
|
|
let completedResponse: OpenAiResponseLike | null = null;
|
|
const request: ResponseCreateParamsStreaming = {
|
|
model: config.openAiChatTarget.model,
|
|
input: responseInput as ResponseInputItem[],
|
|
stream: true,
|
|
tools: getOpenAIResponsesToolsWithImage(config) as ResponseCreateParamsStreaming["tools"],
|
|
};
|
|
const response = await openAi.responses.create(request, {signal}) as unknown as AsyncIterableStream<ResponseStreamEvent>;
|
|
|
|
aiLog("debug", "openai.stream.open", {round});
|
|
|
|
let localToolCalls: ToolCallData[] = [];
|
|
for await (const event of response) {
|
|
if (signal.aborted) throw new Error("Aborted");
|
|
|
|
switch (event.type) {
|
|
case "response.output_text.delta":
|
|
streamMessage.append(event.delta ?? "");
|
|
break;
|
|
case "response.image_generation_call.in_progress":
|
|
streamMessage.setStatus(Environment.startingImageGenText);
|
|
await streamMessage.flush();
|
|
break;
|
|
case "response.image_generation_call.generating":
|
|
streamMessage.setStatus(Environment.imageGenText);
|
|
await streamMessage.flush();
|
|
break;
|
|
case "response.image_generation_call.partial_image": {
|
|
const iteration = (event.partial_image_index ?? 0) + 1;
|
|
await showOpenAiGeneratedImage(
|
|
streamMessage,
|
|
sourceMessage,
|
|
event.partial_image_b64,
|
|
`partial_${round}_${iteration}`,
|
|
Environment.getPartialImageGenText(iteration, OPENAI_IMAGE_PARTIALS),
|
|
false,
|
|
);
|
|
break;
|
|
}
|
|
case "response.image_generation_call.completed":
|
|
streamMessage.setStatus(Environment.finalizingImageGenText);
|
|
await streamMessage.flush();
|
|
break;
|
|
case "response.output_item.added":
|
|
if (event.item.type === "function_call" && event.item.name) {
|
|
const item = event.item as OpenAiResponseOutputItem & { id?: string };
|
|
localToolCalls.push({
|
|
id: openAiResponseItemCallId(item),
|
|
name: item.name ?? "",
|
|
argumentsText: item.arguments ?? "{}",
|
|
});
|
|
|
|
aiLog("info", "openai.stream.tool_call.added", {
|
|
round,
|
|
toolCalls: localToolCalls.map(aiLogToolCall)
|
|
});
|
|
streamMessage.setStatus(Environment.getUseToolText(localToolCalls));
|
|
await streamMessage.flush();
|
|
}
|
|
break;
|
|
case "response.output_item.done":
|
|
if (event.item.type === "function_call" && event.item.name) {
|
|
const item = event.item as OpenAiResponseOutputItem & { id?: string };
|
|
const itemId = openAiResponseItemCallId(item);
|
|
const index = localToolCalls.findIndex(c => c.id === itemId);
|
|
if (index !== -1) {
|
|
localToolCalls.splice(index, 1);
|
|
if (localToolCalls.length === 0) {
|
|
streamMessage.clearStatus();
|
|
} else {
|
|
streamMessage.setStatus(Environment.getUseToolText(localToolCalls));
|
|
}
|
|
await streamMessage.flush();
|
|
}
|
|
}
|
|
break;
|
|
case "response.function_call_arguments.delta":
|
|
break;
|
|
case "response.function_call_arguments.done":
|
|
break;
|
|
|
|
case "response.completed":
|
|
completedResponse = event.response as unknown as OpenAiResponseLike;
|
|
break;
|
|
case "response.failed":
|
|
throw new Error(event.response?.error?.message ?? "OpenAI response failed");
|
|
case "error":
|
|
throw new Error(event.message ?? event?.message ?? "OpenAI stream error");
|
|
}
|
|
}
|
|
|
|
if (!completedResponse) throw new Error("OpenAI did not return the final response.completed event.");
|
|
|
|
aiLog("debug", "openai.stream.completed", {
|
|
round,
|
|
duration: aiLogDuration(roundStartedAt),
|
|
outputItems: completedResponse?.output?.length ?? 0,
|
|
});
|
|
|
|
const images = collectOpenAiResponseImages(completedResponse);
|
|
if (images.length) {
|
|
await showOpenAiGeneratedImage(
|
|
streamMessage,
|
|
sourceMessage,
|
|
images[images.length - 1],
|
|
`final_${round}`,
|
|
Environment.getImageGenDoneText(config.openAiImageTarget.model),
|
|
true,
|
|
);
|
|
}
|
|
|
|
const calls = collectOpenAiResponseFunctionCalls(completedResponse);
|
|
aiLog(calls.length ? "info" : "success", calls.length ? "openai.tool_calls" : "openai.run.done", {
|
|
round,
|
|
duration: calls.length ? aiLogDuration(roundStartedAt) : aiLogDuration(runnerStartedAt),
|
|
calls: calls.map(call => ({
|
|
id: call.callId,
|
|
name: call.name,
|
|
arguments: safeJsonParseObject(call.argumentsText)
|
|
})),
|
|
});
|
|
if (!calls.length) return;
|
|
|
|
const toolCalls = calls.map(call => ({
|
|
id: call.callId,
|
|
name: call.name,
|
|
argumentsText: call.argumentsText,
|
|
}));
|
|
const toolResults = await executeToolBatch(toolCalls, streamMessage, toolContext, toolMemory);
|
|
|
|
let successGetNoteFileResult: GetNoteFileResult | undefined = undefined;
|
|
|
|
for (const toolResult of toolResults) {
|
|
try {
|
|
const raw = JSON.parse(toolResult);
|
|
const res = GetNoteFileResultSchema.safeParse(raw);
|
|
|
|
if (res.success && res.data.success) {
|
|
successGetNoteFileResult = res.data;
|
|
}
|
|
} catch {
|
|
// Not every tool result is JSON.
|
|
}
|
|
}
|
|
|
|
if (successGetNoteFileResult && "attachment" in successGetNoteFileResult) {
|
|
await bot.sendDocument({
|
|
chat_id: msg.chat.id,
|
|
reply_parameters: {
|
|
message_id: msg.message_id,
|
|
},
|
|
document: fs.createReadStream(path.join(notesDir, successGetNoteFileResult.attachment.relativePath)),
|
|
}).catch(logError);
|
|
}
|
|
|
|
const toolOutputs = calls.map((call, index) => ({
|
|
type: "function_call_output",
|
|
call_id: call.callId,
|
|
output: toolResults[index] ?? "",
|
|
}));
|
|
responseInput = [...responseInput, ...(completedResponse.output ?? []), ...toolOutputs];
|
|
}
|
|
}
|
|
|
|
|
|
function openAiResponseContentToText(content: unknown): string {
|
|
if (typeof content === "string") return content;
|
|
if (!Array.isArray(content)) return "";
|
|
return content.map(part => isRecord(part) ? part.text ?? part.content ?? part.refusal ?? "" : "").join("");
|
|
}
|
|
|
|
function openAiResponseMessagesToChatCompletions(messages: OpenAIChatMessage[]): OpenAiCompatibleChatMessage[] {
|
|
return messages.map((message): OpenAiCompatibleChatMessage => {
|
|
if (message.role === "system" || message.role === "assistant") {
|
|
return {
|
|
role: message.role,
|
|
content: openAiResponseContentToText(message.content),
|
|
};
|
|
}
|
|
|
|
const content = Array.isArray(message.content)
|
|
? message.content.map((part): OpenAiCompatibleContentPart => {
|
|
if (isRecord(part) && part.type === "input_image") {
|
|
return {
|
|
type: "image_url",
|
|
image_url: {url: String(part.image_url ?? "")},
|
|
};
|
|
}
|
|
|
|
return {
|
|
type: "text",
|
|
text: isRecord(part) && typeof part.text === "string" ? part.text : "",
|
|
};
|
|
})
|
|
: message.content;
|
|
|
|
return {role: "user", content};
|
|
});
|
|
}
|
|
|
|
function normalizeOpenAiChatToolCalls(toolCalls: OpenAiChatToolCallLike[] = []): ToolCallData[] {
|
|
return toolCalls.map((call, i) => ({
|
|
id: call.id || `openai_chat_${Date.now()}_${i}`,
|
|
name: call.function?.name || call.name || "",
|
|
argumentsText: typeof call.function?.arguments === "string"
|
|
? call.function.arguments
|
|
: JSON.stringify(call.function?.arguments ?? call.arguments ?? {}),
|
|
})).filter(call => call.name);
|
|
}
|
|
|
|
async function appendOpenAiChatToolResults(
|
|
messages: OpenAiCompatibleChatMessage[],
|
|
calls: ToolCallData[],
|
|
results: string[],
|
|
): Promise<void> {
|
|
for (const [index, call] of calls.entries()) {
|
|
messages.push({
|
|
role: "tool",
|
|
tool_call_id: call.id,
|
|
content: results[index] ?? "",
|
|
});
|
|
}
|
|
}
|
|
|
|
export async function runOpenAiCompatibleChat(
|
|
msg: Message,
|
|
messages: OpenAIChatMessage[],
|
|
streamMessage: TelegramStreamMessage,
|
|
signal: AbortSignal,
|
|
stream: boolean,
|
|
firstRoundStatus: string,
|
|
config: RuntimeConfigSnapshot,
|
|
toolContext: ToolRuntimeContext,
|
|
): Promise<void> {
|
|
const runnerStartedAt = Date.now();
|
|
const geminiOpenAi = createGeminiOpenAiClient(config.geminiChatTarget);
|
|
const chatMessages = openAiResponseMessagesToChatCompletions(messages);
|
|
const toolMemory: ToolExecutionMemory = new Map();
|
|
|
|
aiLog("info", "openai_compatible.run.start", {
|
|
stream,
|
|
target: aiLogProviderTarget(config.geminiChatTarget),
|
|
inputMessages: messages.length,
|
|
chatMessages: chatMessages.length,
|
|
hasToolInputFiles: !!toolContext.pythonInputFiles?.length,
|
|
});
|
|
|
|
for (let round = 0; round < MAX_TOOL_ROUNDS; round++) {
|
|
const roundStartedAt = Date.now();
|
|
aiLog("debug", "openai_compatible.round.start", {round, messages: chatMessages.length, stream});
|
|
streamMessage.setStatus(roundStatus(round, firstRoundStatus) ?? "");
|
|
await streamMessage.flush();
|
|
|
|
if (!stream) {
|
|
const request: ChatCompletionCreateParamsNonStreaming = {
|
|
model: config.geminiChatTarget.model,
|
|
messages: chatMessages,
|
|
tools: getOpenAITools(),
|
|
temperature: 0.6,
|
|
};
|
|
const response = await geminiOpenAi.chat.completions.create(request, {signal}) as unknown as OpenAiChatCompletionResponseLike;
|
|
const message = response.choices?.[0]?.message;
|
|
streamMessage.append(message?.content ?? "");
|
|
const calls = normalizeOpenAiChatToolCalls(message?.tool_calls ?? []);
|
|
aiLog(calls.length ? "info" : "success", calls.length ? "openai_compatible.tool_calls" : "openai_compatible.run.done", {
|
|
round,
|
|
duration: calls.length ? aiLogDuration(roundStartedAt) : aiLogDuration(runnerStartedAt),
|
|
textChars: message?.content?.length ?? 0,
|
|
calls: calls.map(aiLogToolCall),
|
|
});
|
|
if (!calls.length) return;
|
|
|
|
chatMessages.push({
|
|
role: "assistant",
|
|
content: message?.content ?? "",
|
|
tool_calls: calls.map(call => ({
|
|
id: call.id,
|
|
type: "function" as const,
|
|
function: {
|
|
name: call.name,
|
|
arguments: call.argumentsText,
|
|
},
|
|
})),
|
|
});
|
|
|
|
const toolResults = await executeToolBatch(calls, streamMessage, toolContext, toolMemory);
|
|
|
|
let successGetNoteFileResult: GetNoteFileResult | undefined = undefined;
|
|
|
|
for (const toolResult of toolResults) {
|
|
try {
|
|
const raw = JSON.parse(toolResult);
|
|
const res = GetNoteFileResultSchema.safeParse(raw);
|
|
|
|
if (res.success && res.data.success) {
|
|
successGetNoteFileResult = res.data;
|
|
}
|
|
} catch {
|
|
// Not every tool result is JSON.
|
|
}
|
|
}
|
|
|
|
if (successGetNoteFileResult && "attachment" in successGetNoteFileResult) {
|
|
await bot.sendDocument({
|
|
chat_id: msg.chat.id,
|
|
reply_parameters: {
|
|
message_id: msg.message_id,
|
|
},
|
|
document: fs.createReadStream(path.join(notesDir, successGetNoteFileResult.attachment.relativePath)),
|
|
}).catch(logError);
|
|
}
|
|
|
|
await appendOpenAiChatToolResults(chatMessages, calls, toolResults);
|
|
continue;
|
|
}
|
|
|
|
const request: ChatCompletionCreateParamsStreaming = {
|
|
model: config.geminiChatTarget.model,
|
|
messages: chatMessages,
|
|
tools: getOpenAITools(),
|
|
temperature: 0.6,
|
|
stream: true,
|
|
};
|
|
const response = await geminiOpenAi.chat.completions.create(request, {signal}) as unknown as AsyncIterableStream<OpenAiChatCompletionStreamChunkLike>;
|
|
|
|
aiLog("debug", "openai_compatible.stream.open", {round});
|
|
// const streamToolCalls: OpenAiChatToolCallLike[] = [];
|
|
const roundTextStart = streamMessage.getText().length;
|
|
const toolCallAccumulator = new StreamingToolCallAccumulator("openai_chat_stream", round);
|
|
let calls: ToolCallData[] = [];
|
|
|
|
for await (const chunk of response) {
|
|
if (signal.aborted) throw new Error("Aborted");
|
|
|
|
const delta = chunk.choices?.[0]?.delta;
|
|
streamMessage.append(delta?.content ?? "");
|
|
|
|
if (delta?.tool_calls?.length) {
|
|
calls = toolCallAccumulator.add(delta.tool_calls);
|
|
streamMessage.setStatus(Environment.getUseToolText(calls));
|
|
await streamMessage.flush();
|
|
}
|
|
}
|
|
|
|
// const calls = collectOpenAiChatStreamToolCalls(streamToolCalls);
|
|
aiLog(calls.length ? "info" : "success", calls.length ? "openai_compatible.tool_calls" : "openai_compatible.run.done", {
|
|
round,
|
|
duration: calls.length ? aiLogDuration(roundStartedAt) : aiLogDuration(runnerStartedAt),
|
|
textChars: streamMessage.getText().slice(roundTextStart).length,
|
|
calls: calls.map(aiLogToolCall),
|
|
});
|
|
if (!calls.length) return;
|
|
|
|
const roundText = streamMessage.getText().slice(roundTextStart);
|
|
chatMessages.push({
|
|
role: "assistant",
|
|
content: roundText,
|
|
tool_calls: calls.map(call => ({
|
|
id: call.id,
|
|
type: "function",
|
|
function: {
|
|
name: call.name,
|
|
arguments: call.argumentsText,
|
|
},
|
|
})),
|
|
});
|
|
|
|
const toolResults = await executeToolBatch(calls, streamMessage, toolContext, toolMemory);
|
|
|
|
let successGetNoteFileResult: GetNoteFileResult | undefined = undefined;
|
|
|
|
for (const toolResult of toolResults) {
|
|
try {
|
|
const raw = JSON.parse(toolResult);
|
|
const res = GetNoteFileResultSchema.safeParse(raw);
|
|
|
|
if (res.success && res.data.success) {
|
|
successGetNoteFileResult = res.data;
|
|
}
|
|
} catch {
|
|
// Not every tool result is JSON.
|
|
}
|
|
}
|
|
|
|
if (successGetNoteFileResult && "attachment" in successGetNoteFileResult) {
|
|
await bot.sendDocument({
|
|
chat_id: msg.chat.id,
|
|
reply_parameters: {
|
|
message_id: msg.message_id,
|
|
},
|
|
document: fs.createReadStream(path.join(notesDir, successGetNoteFileResult.attachment.relativePath)),
|
|
}).catch(logError);
|
|
}
|
|
|
|
await appendOpenAiChatToolResults(chatMessages, calls, toolResults);
|
|
}
|
|
}
|