This commit is contained in:
2026-05-13 10:18:54 +03:00
parent cd8d2683c0
commit c5b61ee3d8
38 changed files with 3929 additions and 3718 deletions
+426
View File
@@ -0,0 +1,426 @@
// OpenAI and OpenAI-compatible provider runners extracted from unified-ai-runner.ts.
import {Message} from "typescript-telegram-bot-api";
import {Environment} from "../common/environment";
import {getOpenAITools} from "./tool-mappers";
import {TelegramStreamMessage} from "./telegram-stream-message";
import {ToolRuntimeContext} from "./tools/runtime";
import {OpenAIChatMessage} from "./openai-chat-message";
import type {ResponseCreateParamsNonStreaming, ResponseCreateParamsStreaming, ResponseInputItem, ResponseStreamEvent} from "openai/resources/responses/responses";
import type {ChatCompletionCreateParamsNonStreaming, ChatCompletionCreateParamsStreaming} from "openai/resources/chat/completions";
import {createGeminiOpenAiClient, createOpenAiClient} from "./ai-runtime-target";
import {aiLog, aiLogDuration, aiLogMessageIdentity, aiLogProviderTarget, aiLogToolCall} from "./ai-logger";
import {AsyncIterableStream, MAX_TOOL_ROUNDS, OPENAI_IMAGE_PARTIALS, OpenAiChatCompletionResponseLike, OpenAiChatToolCallLike, OpenAiCompatibleChatMessage, OpenAiCompatibleContentPart, OpenAiResponseLike, OpenAiResponseOutputItem, RuntimeConfigSnapshot, ToolCallData, StreamingToolCallAccumulator, collectOpenAiResponseFunctionCalls, collectOpenAiResponseImages, collectOpenAiResponseText, executeToolBatch, getOpenAIResponsesToolsWithImage, openAiResponseItemCallId, safeJsonParseObject, showOpenAiGeneratedImage, ToolExecutionMemory, isRecord, roundStatus, OpenAiChatCompletionStreamChunkLike} from "./unified-ai-runner.shared";
export async function runOpenAi(
messages: OpenAIChatMessage[],
streamMessage: TelegramStreamMessage,
signal: AbortSignal,
stream: boolean,
firstRoundStatus: string,
sourceMessage: Message,
config: RuntimeConfigSnapshot,
toolContext: ToolRuntimeContext,
): Promise<void> {
// TODO: 13.05.2026: remove
firstRoundStatus;
const runnerStartedAt = Date.now();
let responseInput: unknown[] = [...messages];
const openAi = createOpenAiClient(config.openAiChatTarget);
aiLog("info", "openai.run.start", {
stream,
target: aiLogProviderTarget(config.openAiChatTarget),
imageTarget: aiLogProviderTarget(config.openAiImageTarget),
inputMessages: messages.length,
sourceMessage: aiLogMessageIdentity(sourceMessage),
hasToolInputFiles: !!toolContext.pythonInputFiles?.length,
});
const toolMemory: ToolExecutionMemory = new Map();
for (let round = 0; round < MAX_TOOL_ROUNDS; round++) {
const roundStartedAt = Date.now();
aiLog("debug", "openai.round.start", {round, inputItems: responseInput.length, stream});
if (!stream) {
const request: ResponseCreateParamsNonStreaming = {
model: config.openAiChatTarget.model,
input: responseInput as ResponseInputItem[],
// TODO: 13.05.2026, Danil Nikolaev: fix
tools: getOpenAIResponsesToolsWithImage(config) as any,
instructions: config.systemPrompt,
};
const response = await openAi.responses.create(request, {signal}) as unknown as OpenAiResponseLike;
const responseText = collectOpenAiResponseText(response);
streamMessage.append(responseText);
aiLog("debug", "openai.response.received", {
round,
duration: aiLogDuration(roundStartedAt),
textChars: responseText.length,
outputItems: response?.output?.length ?? 0,
});
const images = collectOpenAiResponseImages(response);
if (images.length) {
await showOpenAiGeneratedImage(
streamMessage,
sourceMessage,
images[images.length - 1],
`final_${round}`,
Environment.getImageGenDoneText(config.openAiImageTarget.model),
true,
);
}
const calls = collectOpenAiResponseFunctionCalls(response);
aiLog(calls.length ? "info" : "success", calls.length ? "openai.tool_calls" : "openai.run.done", {
round,
duration: calls.length ? aiLogDuration(roundStartedAt) : aiLogDuration(runnerStartedAt),
calls: calls.map(call => ({
id: call.callId,
name: call.name,
arguments: safeJsonParseObject(call.argumentsText)
})),
});
if (!calls.length) return;
const toolCalls = calls.map(call => ({
id: call.callId,
name: call.name,
argumentsText: call.argumentsText,
}));
const toolResults = await executeToolBatch(toolCalls, streamMessage, toolContext, toolMemory);
const toolOutputs = calls.map((call, index) => ({
type: "function_call_output" as const,
call_id: call.callId,
output: toolResults[index] ?? "",
}));
responseInput = [...responseInput, ...(response.output ?? []), ...toolOutputs];
continue;
}
let completedResponse: OpenAiResponseLike | null = null;
const request: ResponseCreateParamsStreaming = {
model: config.openAiChatTarget.model,
input: responseInput as ResponseInputItem[],
stream: true,
// TODO: 13.05.2026, Danil Nikolaev: fix
tools: getOpenAIResponsesToolsWithImage(config) as any,
};
const response = await openAi.responses.create(request, {signal}) as unknown as AsyncIterableStream<ResponseStreamEvent>;
aiLog("debug", "openai.stream.open", {round});
let localToolCalls: ToolCallData[] = [];
for await (const event of response) {
if (signal.aborted) throw new Error("Aborted");
switch (event.type) {
case "response.output_text.delta":
streamMessage.append(event.delta ?? "");
break;
case "response.image_generation_call.in_progress":
streamMessage.setStatus(Environment.startingImageGenText);
await streamMessage.flush();
break;
case "response.image_generation_call.generating":
streamMessage.setStatus(Environment.imageGenText);
await streamMessage.flush();
break;
case "response.image_generation_call.partial_image": {
const iteration = (event.partial_image_index ?? 0) + 1;
await showOpenAiGeneratedImage(
streamMessage,
sourceMessage,
event.partial_image_b64,
`partial_${round}_${iteration}`,
Environment.getPartialImageGenText(iteration, OPENAI_IMAGE_PARTIALS),
false,
);
break;
}
case "response.image_generation_call.completed":
streamMessage.setStatus(Environment.finalizingImageGenText);
await streamMessage.flush();
break;
case "response.output_item.added":
if (event.item.type === "function_call" && event.item.name) {
const item = event.item as OpenAiResponseOutputItem & { id?: string };
localToolCalls.push({
id: openAiResponseItemCallId(item),
name: item.name ?? "",
argumentsText: item.arguments ?? "{}",
});
aiLog("info", "openai.stream.tool_call.added", {
round,
toolCalls: localToolCalls.map(aiLogToolCall)
});
streamMessage.setStatus(Environment.getUseToolText(localToolCalls));
await streamMessage.flush();
}
break;
case "response.output_item.done":
if (event.item.type === "function_call" && event.item.name) {
const item = event.item as OpenAiResponseOutputItem & { id?: string };
const itemId = openAiResponseItemCallId(item);
const index = localToolCalls.findIndex(c => c.id === itemId);
if (index !== -1) {
localToolCalls.splice(index, 1);
if (localToolCalls.length === 0) {
streamMessage.clearStatus();
} else {
streamMessage.setStatus(Environment.getUseToolText(localToolCalls));
}
await streamMessage.flush();
}
}
break;
case "response.function_call_arguments.delta":
break;
case "response.function_call_arguments.done":
break;
case "response.completed":
completedResponse = event.response as unknown as OpenAiResponseLike;
break;
case "response.failed":
throw new Error(event.response?.error?.message ?? "OpenAI response failed");
case "error":
throw new Error(event.message ?? event?.message ?? "OpenAI stream error");
}
}
if (!completedResponse) throw new Error("OpenAI did not return the final response.completed event.");
aiLog("debug", "openai.stream.completed", {
round,
duration: aiLogDuration(roundStartedAt),
outputItems: completedResponse?.output?.length ?? 0,
});
const images = collectOpenAiResponseImages(completedResponse);
if (images.length) {
await showOpenAiGeneratedImage(
streamMessage,
sourceMessage,
images[images.length - 1],
`final_${round}`,
Environment.getImageGenDoneText(config.openAiImageTarget.model),
true,
);
}
const calls = collectOpenAiResponseFunctionCalls(completedResponse);
aiLog(calls.length ? "info" : "success", calls.length ? "openai.tool_calls" : "openai.run.done", {
round,
duration: calls.length ? aiLogDuration(roundStartedAt) : aiLogDuration(runnerStartedAt),
calls: calls.map(call => ({
id: call.callId,
name: call.name,
arguments: safeJsonParseObject(call.argumentsText)
})),
});
if (!calls.length) return;
const toolCalls = calls.map(call => ({
id: call.callId,
name: call.name,
argumentsText: call.argumentsText,
}));
const toolResults = await executeToolBatch(toolCalls, streamMessage, toolContext, toolMemory);
const toolOutputs = calls.map((call, index) => ({
type: "function_call_output",
call_id: call.callId,
output: toolResults[index] ?? "",
}));
responseInput = [...responseInput, ...(completedResponse.output ?? []), ...toolOutputs];
}
}
function openAiResponseContentToText(content: unknown): string {
if (typeof content === "string") return content;
if (!Array.isArray(content)) return "";
return content.map(part => isRecord(part) ? part.text ?? part.content ?? part.refusal ?? "" : "").join("");
}
function openAiResponseMessagesToChatCompletions(messages: OpenAIChatMessage[]): OpenAiCompatibleChatMessage[] {
return messages.map((message): OpenAiCompatibleChatMessage => {
if (message.role === "system" || message.role === "assistant") {
return {
role: message.role,
content: openAiResponseContentToText(message.content),
};
}
const content = Array.isArray(message.content)
? message.content.map((part): OpenAiCompatibleContentPart => {
if (isRecord(part) && part.type === "input_image") {
return {
type: "image_url",
image_url: {url: String(part.image_url ?? "")},
};
}
return {
type: "text",
text: isRecord(part) && typeof part.text === "string" ? part.text : "",
};
})
: message.content;
return {role: "user", content};
});
}
function normalizeOpenAiChatToolCalls(toolCalls: OpenAiChatToolCallLike[] = []): ToolCallData[] {
return toolCalls.map((call, i) => ({
id: call.id || `openai_chat_${Date.now()}_${i}`,
name: call.function?.name || call.name || "",
argumentsText: typeof call.function?.arguments === "string"
? call.function.arguments
: JSON.stringify(call.function?.arguments ?? call.arguments ?? {}),
})).filter(call => call.name);
}
async function appendOpenAiChatToolResults(
messages: OpenAiCompatibleChatMessage[],
calls: ToolCallData[],
results: string[],
): Promise<void> {
for (const [index, call] of calls.entries()) {
messages.push({
role: "tool",
tool_call_id: call.id,
content: results[index] ?? "",
});
}
}
export async function runOpenAiCompatibleChat(
messages: OpenAIChatMessage[],
streamMessage: TelegramStreamMessage,
signal: AbortSignal,
stream: boolean,
firstRoundStatus: string,
config: RuntimeConfigSnapshot,
toolContext: ToolRuntimeContext,
): Promise<void> {
const runnerStartedAt = Date.now();
const geminiOpenAi = createGeminiOpenAiClient(config.geminiChatTarget);
const chatMessages = openAiResponseMessagesToChatCompletions(messages);
const toolMemory: ToolExecutionMemory = new Map();
aiLog("info", "openai_compatible.run.start", {
stream,
target: aiLogProviderTarget(config.geminiChatTarget),
inputMessages: messages.length,
chatMessages: chatMessages.length,
hasToolInputFiles: !!toolContext.pythonInputFiles?.length,
});
for (let round = 0; round < MAX_TOOL_ROUNDS; round++) {
const roundStartedAt = Date.now();
aiLog("debug", "openai_compatible.round.start", {round, messages: chatMessages.length, stream});
streamMessage.setStatus(roundStatus(round, firstRoundStatus) ?? "");
await streamMessage.flush();
if (!stream) {
const request: ChatCompletionCreateParamsNonStreaming = {
model: config.geminiChatTarget.model,
messages: chatMessages,
tools: getOpenAITools(),
temperature: 0.6,
};
const response = await geminiOpenAi.chat.completions.create(request, {signal}) as unknown as OpenAiChatCompletionResponseLike;
const message = response.choices?.[0]?.message;
streamMessage.append(message?.content ?? "");
const calls = normalizeOpenAiChatToolCalls(message?.tool_calls ?? []);
aiLog(calls.length ? "info" : "success", calls.length ? "openai_compatible.tool_calls" : "openai_compatible.run.done", {
round,
duration: calls.length ? aiLogDuration(roundStartedAt) : aiLogDuration(runnerStartedAt),
textChars: message?.content?.length ?? 0,
calls: calls.map(aiLogToolCall),
});
if (!calls.length) return;
chatMessages.push({
role: "assistant",
content: message?.content ?? "",
tool_calls: calls.map(call => ({
id: call.id,
type: "function" as const,
function: {
name: call.name,
arguments: call.argumentsText,
},
})),
});
await appendOpenAiChatToolResults(chatMessages, calls, await executeToolBatch(calls, streamMessage, toolContext, toolMemory));
continue;
}
const request: ChatCompletionCreateParamsStreaming = {
model: config.geminiChatTarget.model,
messages: chatMessages,
tools: getOpenAITools(),
temperature: 0.6,
stream: true,
};
const response = await geminiOpenAi.chat.completions.create(request, {signal}) as unknown as AsyncIterableStream<OpenAiChatCompletionStreamChunkLike>;
aiLog("debug", "openai_compatible.stream.open", {round});
// const streamToolCalls: OpenAiChatToolCallLike[] = [];
const roundTextStart = streamMessage.getText().length;
const toolCallAccumulator = new StreamingToolCallAccumulator("openai_chat_stream", round);
let calls: ToolCallData[] = [];
for await (const chunk of response) {
if (signal.aborted) throw new Error("Aborted");
const delta = chunk.choices?.[0]?.delta;
streamMessage.append(delta?.content ?? "");
if (delta?.tool_calls?.length) {
calls = toolCallAccumulator.add(delta.tool_calls);
streamMessage.setStatus(Environment.getUseToolText(calls));
await streamMessage.flush();
}
}
// const calls = collectOpenAiChatStreamToolCalls(streamToolCalls);
aiLog(calls.length ? "info" : "success", calls.length ? "openai_compatible.tool_calls" : "openai_compatible.run.done", {
round,
duration: calls.length ? aiLogDuration(roundStartedAt) : aiLogDuration(runnerStartedAt),
textChars: streamMessage.getText().slice(roundTextStart).length,
calls: calls.map(aiLogToolCall),
});
if (!calls.length) return;
const roundText = streamMessage.getText().slice(roundTextStart);
chatMessages.push({
role: "assistant",
content: roundText,
tool_calls: calls.map(call => ({
id: call.id,
type: "function",
function: {
name: call.name,
arguments: call.argumentsText,
},
})),
});
await appendOpenAiChatToolResults(chatMessages, calls, await executeToolBatch(calls, streamMessage, toolContext, toolMemory));
}
}
export class OpenAiProviderRunner {
static run = runOpenAi;
}
export class OpenAiCompatibleProviderRunner {
static run = runOpenAiCompatibleChat;
}