utils: add shared locks, queues, rendering and message helpers

This commit is contained in:
2026-05-10 22:52:25 +03:00
parent d666244863
commit 4c2a5471df
11 changed files with 3046 additions and 1139 deletions
+6 -2
View File
@@ -3,6 +3,7 @@ import {CallbackQuery, InlineKeyboardButton} from "typescript-telegram-bot-api";
import {Requirements} from "./requirements";
import {bot} from "../index";
import {logError} from "../util/utils";
import {enqueueTelegramApiCall} from "../util/telegram-api-queue";
export abstract class CallbackCommand {
@@ -13,7 +14,7 @@ export abstract class CallbackCommand {
abstract execute(query: CallbackQuery): Promise<void>;
// eslint-disable-next-line @typescript-eslint/no-unused-vars
afterExecute(query: CallbackQuery): Promise<void> {
afterExecute(_query: CallbackQuery): Promise<void> {
return Promise.resolve();
}
@@ -23,7 +24,10 @@ export abstract class CallbackCommand {
}
async answerCallbackQuery(query: CallbackQuery): Promise<void> {
bot.answerCallbackQuery(this.getOptions(query)).catch(logError);
enqueueTelegramApiCall(
() => bot.answerCallbackQuery(this.getOptions(query)),
{method: "answerCallbackQuery", skipPerChatLimit: true}
).catch(logError);
}
asButton(): InlineKeyboardButton {
+2 -2
View File
@@ -51,8 +51,8 @@ export function createCommandRegExp(
argsMode === "none"
? "\\s*$"
: argsMode === "required"
? "\\s+([\\s\\S]+)\\s*$" // (3)=args обязателен
: "(?:\\s+([\\s\\S]+))?\\s*$"; // (3)=args опционален
? "\\s+([\\s\\S]+)\\s*$" // (3)=args required
: "(?:\\s+([\\s\\S]+))?\\s*$"; // (3)=args optional
return new RegExp(base + tail, "i");
}
+19 -1
View File
@@ -1,6 +1,24 @@
export type MessageImagePart = {
data: string;
mimeType: string;
}
export type MessageAudioPart = {
data: string;
mimeType: string;
}
export type MessagePart = {
bot: boolean;
name?: string;
langCode?: string;
userName?: string;
content: string;
images: string[];
images?: string[];
imageParts?: MessageImagePart[];
audios?: string[];
audioParts?: MessageAudioPart[];
documents?: string[];
videos?: string[];
videoNotes?: string[];
}
+76
View File
@@ -0,0 +1,76 @@
export class AsyncSemaphore {
private active = 0;
private readonly waiters: Array<() => void> = [];
constructor(private readonly maxActive: number) {
if (!Number.isInteger(maxActive) || maxActive < 1) {
throw new Error("AsyncSemaphore maxActive must be a positive integer.");
}
}
async runExclusive<T>(task: () => Promise<T> | T): Promise<T> {
await this.acquire();
try {
return await task();
} finally {
this.release();
}
}
private async acquire(): Promise<void> {
if (this.active < this.maxActive) {
this.active++;
return;
}
await new Promise<void>(resolve => {
this.waiters.push(resolve);
});
this.active++;
}
private release(): void {
this.active--;
const next = this.waiters.shift();
if (next) {
next();
}
}
}
export class KeyedAsyncLock {
private readonly chains = new Map<string, Promise<void>>();
async runExclusive<T>(key: string, task: () => Promise<T> | T): Promise<T> {
const previous = this.chains.get(key) ?? Promise.resolve();
let release!: () => void;
const current = new Promise<void>(resolve => {
release = resolve;
});
const tail = previous.then(() => current, () => current);
this.chains.set(key, tail);
await previous.catch(() => undefined);
try {
return await task();
} finally {
release();
if (this.chains.get(key) === tail) {
this.chains.delete(key);
}
}
}
}
export function createQueuedFunction() {
let chain = Promise.resolve();
return async function enqueue<T>(task: () => Promise<T> | T): Promise<T> {
const run = chain.then(task, task);
chain = run.then(() => undefined, () => undefined);
return run;
};
}
+9
View File
@@ -0,0 +1,9 @@
export class HtmlUtils {
static escape(input: string): string {
return input
.replace(/&/g, "&amp;")
.replace(/</g, "&lt;")
.replace(/>/g, "&gt;")
.replace(/"/g, "&quot;");
}
}
+27
View File
@@ -0,0 +1,27 @@
export function getLruMapValue<K, V>(map: Map<K, V>, key: K): V | undefined {
if (!map.has(key)) return undefined;
const value = map.get(key)!;
map.delete(key);
map.set(key, value);
return value;
}
export function setLruMapValue<K, V>(map: Map<K, V>, key: K, value: V, maxSize: number): void {
if (maxSize < 1) {
map.clear();
return;
}
if (map.has(key)) {
map.delete(key);
}
map.set(key, value);
while (map.size > maxSize) {
const oldestKey = map.keys().next();
if (oldestKey.done) return;
map.delete(oldestKey.value);
}
}
+728
View File
@@ -0,0 +1,728 @@
export type TelegramRenderMode = "draft" | "final";
export interface TelegramMarkdownV2RenderOptions {
/**
* draft:
* - useful for streaming/editMessageText
* - temporarily closes unfinished code blocks / inline code / bold
*
* final:
* - use after LLM finished generation
*/
mode?: TelegramRenderMode;
/**
* Used when the rendered message is empty.
*/
fallbackText?: string;
}
/**
* Main function.
*
* Flow:
* LLM Markdown-lite
* -> draft safety, if needed
* -> normalize unsupported Markdown
* -> parse Markdown-lite
* -> render valid Telegram MarkdownV2
*/
export function prepareTelegramMarkdownV2(
input: string,
options: TelegramMarkdownV2RenderOptions = {},
): string {
const mode = options.mode ?? "final";
const fallbackText = options.fallbackText ?? "…";
try {
const safeInput = mode === "draft"
? makePartialMarkdownLiteSafe(input)
: input;
const normalized = normalizeUnsupportedMarkdown(safeInput);
const ast = parseMarkdownLite(normalized);
const rendered = renderMarkdownV2(ast).trim();
return rendered || escapeMarkdownV2Text(fallbackText);
} catch {
const fallback = escapeMarkdownV2Text(input).trim();
return fallback || escapeMarkdownV2Text(fallbackText);
}
}
/**
* Useful for editMessageText fallback.
*/
export function prepareTelegramPlainMarkdownV2(input: string, fallbackText = "…"): string {
const escaped = escapeMarkdownV2Text(input).trim();
return escaped || escapeMarkdownV2Text(fallbackText);
}
/**
* Draft-safe mode for streaming.
*
* Fixes cases like:
*
* ```ts
* const x =
*
* or:
*
* *partial bold
*
* or:
*
* `partial code
*/
export function makePartialMarkdownLiteSafe(input: string): string {
let text = input.replace(/\r\n?/g, "\n");
if (isInsideFencedCodeBlock(text)) {
return closeUnclosedFencedCodeBlock(text);
}
return transformOutsideFencedCode(text, (outside) => {
let result = outside;
result = closeUnclosedInlineCode(result);
result = closeUnclosedBold(result);
return result;
});
}
/**
* Converts unsupported / annoying Markdown into simpler Markdown-lite.
*
* Does not transform fenced code blocks.
*/
export function normalizeUnsupportedMarkdown(input: string): string {
const text = input.replace(/\r\n?/g, "\n").trim();
return transformOutsideFencedCode(text, (raw) => {
let result = raw;
result = normalizeMarkdownTables(result);
result = result
// Images: ![alt](url) -> [alt](url)
.replace(/!\[([^\]\n]*)]\(([^)\n]+)\)/g, "[$1]($2)")
// Common Markdown bold -> Markdown-lite bold
.replace(/\*\*([^*\n]+)\*\*/g, "*$1*")
.replace(/__([^_\n]+)__/g, "*$1*")
.replace(/^`([^`\n]+)$/gm, (_, title: string) => {
const cleanTitle = title.trim();
return cleanTitle ? `*${cleanTitle}*` : "";
})
// Headings -> bold labels
.replace(/^#{1,6}\s+(.+)$/gm, (_, title: string) => {
const cleanTitle = title
.replace(/[*_`[\]()~>#+\-=|{}.!]/g, "")
.trim();
return cleanTitle ? `*${cleanTitle}*` : "";
})
// Horizontal rules
.replace(/^\s*(-{3,}|\*{3,}|_{3,})\s*$/gm, "")
// Task lists -> normal bullets
.replace(/^(\s*)[-*]\s+\[[ xX]]\s+/gm, "$1- ")
// HTML line breaks -> newline
.replace(/<br\s*\/?>/gi, "\n")
// Strip simple raw HTML tags, keep content
.replace(/<\/?(?:p|div|span|strong|b|em|i|u|s|del|code|pre)[^>]*>/gi, "")
// Too many blank lines
.replace(/\n{3,}/g, "\n\n");
return result.trim();
});
}
/**
* AST
*/
type InlineNode =
| { type: "text"; value: string }
| { type: "bold"; children: InlineNode[] }
| { type: "code"; value: string }
| { type: "link"; text: string; url: string };
type BlockNode =
| { type: "paragraph"; children: InlineNode[] }
| { type: "pre"; lang?: string; value: string }
| { type: "quote"; lines: InlineNode[][] };
/**
* Block parser:
* - fenced code blocks
* - quotes
* - paragraphs
*/
export function parseMarkdownLite(input: string): BlockNode[] {
const lines = input.replace(/\r\n?/g, "\n").split("\n");
const blocks: BlockNode[] = [];
let i = 0;
while (i < lines.length) {
const line = lines[i];
if (!line.trim()) {
i++;
continue;
}
const fenceStart = line.match(/^```\s*([^`]*)\s*$/);
if (fenceStart) {
const lang = sanitizeCodeLanguage(fenceStart[1]);
const body: string[] = [];
i++;
while (i < lines.length && !/^```\s*$/.test(lines[i])) {
body.push(lines[i]);
i++;
}
if (i < lines.length) {
i++;
}
blocks.push({
type: "pre",
lang,
value: body.join("\n"),
});
continue;
}
if (/^\s*>\s?/.test(line)) {
const quoteLines: InlineNode[][] = [];
while (i < lines.length && /^\s*>\s?/.test(lines[i])) {
const quoteLine = lines[i].replace(/^\s*>\s?/, "");
quoteLines.push(parseInlineMarkdownLite(quoteLine));
i++;
}
blocks.push({
type: "quote",
lines: quoteLines,
});
continue;
}
const paragraphLines: string[] = [];
while (
i < lines.length &&
lines[i].trim() &&
!/^```\s*([^`]*)\s*$/.test(lines[i]) &&
!/^\s*>\s?/.test(lines[i])
) {
paragraphLines.push(lines[i]);
i++;
}
if (paragraphLines.length === 0) {
paragraphLines.push(lines[i]);
i++;
}
blocks.push({
type: "paragraph",
children: parseInlineMarkdownLite(paragraphLines.join("\n")),
});
}
return blocks;
}
/**
* Inline parser:
* - *bold*
* - `code`
* - [text](url)
*
* This is intentionally not a full Markdown parser.
*/
export function parseInlineMarkdownLite(source: string): InlineNode[] {
const nodes: InlineNode[] = [];
let buffer = "";
let i = 0;
const flushText = () => {
if (buffer) {
nodes.push({ type: "text", value: buffer });
buffer = "";
}
};
while (i < source.length) {
const ch = source[i];
if (ch === "`") {
const end = findNextUnescaped(source, "`", i + 1);
if (end !== -1) {
flushText();
nodes.push({
type: "code",
value: source.slice(i + 1, end),
});
i = end + 1;
continue;
}
}
if (ch === "[") {
const labelEnd = findNextUnescaped(source, "]", i + 1);
if (labelEnd !== -1 && source[labelEnd + 1] === "(") {
const urlStart = labelEnd + 2;
const urlEnd = findMarkdownLinkEnd(source, urlStart);
if (urlEnd !== -1) {
const text = source.slice(i + 1, labelEnd).trim();
const url = source.slice(urlStart, urlEnd).trim();
if (text && isSafeUrl(url)) {
flushText();
nodes.push({
type: "link",
text,
url,
});
i = urlEnd + 1;
continue;
}
}
}
}
if (ch === "*" && canStartBold(source, i)) {
const end = findBoldEnd(source, i + 1);
if (end !== -1 && canEndBold(source, end)) {
const content = source.slice(i + 1, end);
if (content.trim()) {
flushText();
nodes.push({
type: "bold",
children: parseInlineMarkdownLite(content),
});
i = end + 1;
continue;
}
}
}
buffer += ch;
i++;
}
flushText();
return nodes;
}
/**
* MarkdownV2 renderer
*/
export function renderMarkdownV2(blocks: BlockNode[]): string {
return blocks
.map(renderBlockMarkdownV2)
.filter(Boolean)
.join("\n\n")
.trim();
}
function renderBlockMarkdownV2(block: BlockNode): string {
switch (block.type) {
case "paragraph":
return renderInlineMarkdownV2(block.children);
case "pre": {
const lang = block.lang ? block.lang : "";
const code = escapeMarkdownV2Code(block.value);
if (lang) {
return "```" + lang + "\n" + code + "\n```";
}
return "```\n" + code + "\n```";
}
case "quote":
return block.lines
.map((line) => ">" + renderInlineMarkdownV2(line))
.join("\n");
}
}
function renderInlineMarkdownV2(nodes: InlineNode[]): string {
return nodes.map(renderInlineNodeMarkdownV2).join("");
}
function renderInlineNodeMarkdownV2(node: InlineNode): string {
switch (node.type) {
case "text":
return escapeMarkdownV2Text(node.value);
case "bold":
return "*" + renderInlineMarkdownV2(node.children) + "*";
case "code":
return "`" + escapeMarkdownV2Code(node.value) + "`";
case "link":
return `[${escapeMarkdownV2Text(node.text)}](${escapeMarkdownV2LinkUrl(node.url)})`;
}
}
/**
* Telegram MarkdownV2 escaping
*/
export function escapeMarkdownV2Text(value: string): string {
return value
.replace(/\\/g, "\\\\")
.replace(/([_*\[\]()~`>#+\-=|{}.!])/g, "\\$1");
}
export function escapeMarkdownV2Code(value: string): string {
return value
.replace(/\\/g, "\\\\")
.replace(/`/g, "\\`");
}
export function escapeMarkdownV2LinkUrl(value: string): string {
return value
.replace(/\\/g, "\\\\")
.replace(/\)/g, "\\)");
}
/**
* Draft safety helpers
*/
function closeUnclosedFencedCodeBlock(input: string): string {
if (!isInsideFencedCodeBlock(input)) {
return input;
}
return input.endsWith("\n")
? input + "```"
: input + "\n```";
}
function isInsideFencedCodeBlock(input: string): boolean {
const fenceMatches = [...input.matchAll(/^```/gm)];
return fenceMatches.length % 2 === 1;
}
function closeUnclosedInlineCode(input: string): string {
let count = 0;
let escaped = false;
for (const ch of input) {
if (escaped) {
escaped = false;
continue;
}
if (ch === "\\") {
escaped = true;
continue;
}
if (ch === "`") {
count++;
}
}
return count % 2 === 1 ? input + "`" : input;
}
function closeUnclosedBold(input: string): string {
let count = 0;
let escaped = false;
for (let i = 0; i < input.length; i++) {
const ch = input[i];
if (escaped) {
escaped = false;
continue;
}
if (ch === "\\") {
escaped = true;
continue;
}
if (ch !== "*") {
continue;
}
if (isLikelyListMarker(input, i)) {
continue;
}
count++;
}
return count % 2 === 1 ? input + "*" : input;
}
function isLikelyListMarker(input: string, index: number): boolean {
const prev = input[index - 1];
const next = input[index + 1];
const isLineStart = index === 0 || prev === "\n";
return isLineStart && next === " ";
}
/**
* Generic helpers
*/
function findNextUnescaped(source: string, target: string, from: number): number {
for (let i = from; i < source.length; i++) {
if (source[i] === "\\" && i + 1 < source.length) {
i++;
continue;
}
if (source[i] === target) {
return i;
}
}
return -1;
}
function findBoldEnd(source: string, from: number): number {
for (let i = from; i < source.length; i++) {
if (source[i] === "\\" && i + 1 < source.length) {
i++;
continue;
}
if (source[i] === "*") {
return i;
}
}
return -1;
}
function findMarkdownLinkEnd(source: string, from: number): number {
let depth = 0;
for (let i = from; i < source.length; i++) {
const ch = source[i];
if (ch === "\\" && i + 1 < source.length) {
i++;
continue;
}
if (ch === "\n") {
return -1;
}
if (ch === "(") {
depth++;
continue;
}
if (ch === ")") {
if (depth === 0) {
return i;
}
depth--;
}
}
return -1;
}
function canStartBold(source: string, index: number): boolean {
const prev = source[index - 1];
const next = source[index + 1];
if (!next || /\s/.test(next)) {
return false;
}
if (prev && /\w/.test(prev) && /\w/.test(next)) {
return false;
}
return true;
}
function canEndBold(source: string, index: number): boolean {
const prev = source[index - 1];
const next = source[index + 1];
if (!prev || /\s/.test(prev)) {
return false;
}
if (next && /\w/.test(prev) && /\w/.test(next)) {
return false;
}
return true;
}
function sanitizeCodeLanguage(value: string | undefined): string | undefined {
if (!value) return undefined;
const lang = value.trim();
if (!lang) return undefined;
// Telegram language hint after ``` can be used as a visual label too.
// Keep it permissive, but reject dangerous/newline/weird marker chars.
if (!/^[^\s`\\]{1,32}$/.test(lang)) {
return undefined;
}
return lang;
}
function isSafeUrl(url: string): boolean {
return /^(https?:\/\/|tg:\/\/|mailto:)/i.test(url);
}
/**
* Applies transform only outside fenced code blocks.
*/
function transformOutsideFencedCode(
input: string,
transform: (text: string) => string,
): string {
const fences: string[] = [];
const fenceRegex = /```[^\n]*\n[\s\S]*?(?:\n```|$)/g;
const protectedText = input.replace(fenceRegex, (match) => {
const index = fences.push(match) - 1;
return `\uE000FENCE_${index}\uE001`;
});
const transformed = transform(protectedText);
return transformed.replace(/\uE000FENCE_(\d+)\uE001/g, (_, index: string) => {
return fences[Number(index)] ?? "";
});
}
/**
* Converts Markdown tables into simple list rows.
*
* Example:
* | A | B |
* |---|---|
* | 1 | 2 |
*
* ->
* - A: 1; B: 2
*/
function normalizeMarkdownTables(input: string): string {
const lines = input.split("\n");
const output: string[] = [];
let i = 0;
while (i < lines.length) {
const current = lines[i];
const next = lines[i + 1];
if (next && isMarkdownTableSeparator(next) && current.includes("|")) {
const headers = parseTableRow(current);
const rows: string[][] = [];
i += 2;
while (i < lines.length && lines[i].includes("|") && lines[i].trim()) {
rows.push(parseTableRow(lines[i]));
i++;
}
if (rows.length === 0) {
output.push(headers.join(" / "));
continue;
}
for (const row of rows) {
const cells = row
.map((cell, index) => {
const header = headers[index];
if (!cell) return "";
if (!header) return cell;
return `${header}: ${cell}`;
})
.filter(Boolean);
output.push(`- ${cells.join("; ")}`);
}
continue;
}
output.push(current);
i++;
}
return output.join("\n");
}
function isMarkdownTableSeparator(line: string): boolean {
const cells = parseTableRow(line);
return (
cells.length >= 2 &&
cells.every((cell) => /^:?-{3,}:?$/.test(cell.trim()))
);
}
function parseTableRow(line: string): string[] {
return line
.trim()
.replace(/^\|/, "")
.replace(/\|$/, "")
.split("|")
.map((cell) => cell.trim());
}
/**
* Optional helper for streaming/editing.
*
* You can adapt this to your own bot wrapper.
*/
export function shouldEditRenderedMessage(previous: string, next: string): boolean {
return previous !== next && next.trim().length > 0;
}
+14
View File
@@ -0,0 +1,14 @@
export class RandomUtils {
static int(max: number): number {
return Math.floor(Math.random() * Math.floor(max));
}
static rangedInt(from: number, to: number): number {
return RandomUtils.int(to - from) + from;
}
static value<T>(list: readonly T[]): T | undefined {
if (!list.length) return undefined;
return list[RandomUtils.int(list.length)];
}
}
+90
View File
@@ -0,0 +1,90 @@
import {exec} from "node:child_process";
import {promisify} from "node:util";
const execAsync = promisify(exec);
export type ShellCommandResult = {
stdout: string | null | undefined;
stderr: string | null | undefined;
};
export class ShellCommandRunner {
private static readonly forbiddenPatterns = [
/\bsudo\b/,
/\bsu\b/,
/\brm\b/,
/\brmdir\b/,
/\bchmod\b/,
/\bchown\b/,
/\bdd\b/,
/\bmkfs\b/,
/\bmount\b/,
/\bumount\b/,
/\breboot\b/,
/\bshutdown\b/,
/\bkill\b/,
/\bdel\b/i,
/\berase\b/i,
/\brd\b/i,
/\bformat\b/i,
/\btaskkill\b/i,
/\bRemove-Item\b/i,
/\bMove-Item\b/i,
/\bStop-Process\b/i,
/\bRestart-Computer\b/i,
/\bStop-Computer\b/i,
/\bcurl\b/,
/\bwget\b/,
/\bInvoke-WebRequest\b/i,
/\bInvoke-RestMethod\b/i,
/\bssh\b/,
/\bscp\b/,
/\brsync\b/,
/\bnc\b/,
/\bnmap\b/,
/\.\./,
/\/etc\/?/,
/\/home\/?/,
/\/root\/?/,
/~\//,
/\.ssh/,
/\.env/,
];
static async run(command: string): Promise<ShellCommandResult> {
ShellCommandRunner.assertSafe(command);
try {
const {stdout, stderr} = await execAsync(command, {
timeout: 15_000,
maxBuffer: 64 * 1024,
});
if (stdout) {
console.log("COMMAND: ", command, "\n", "Output:", stdout);
}
if (stderr) {
console.error("COMMAND: ", command, "\n", "Error:", stderr);
}
return {stdout, stderr};
} catch (error: any) {
console.error("Error code:", error.code);
console.error("Stderr:", error.stderr);
return {stdout: error.stdout ?? null, stderr: error.stderr ?? error.message};
}
}
private static assertSafe(command: string): void {
if (command.length > 500) {
throw new Error("Command is too long");
}
for (const pattern of ShellCommandRunner.forbiddenPatterns) {
if (pattern.test(command)) {
throw new Error(`Forbidden shell command pattern: ${pattern}`);
}
}
}
}
+700
View File
@@ -0,0 +1,700 @@
/**
* Conservative Telegram Bot API promise queue.
*
* Defaults intentionally prefer safety over throughput:
* - global bot limit: 30 requests / second;
* - per-chat limit: 1 request / second;
* - likely group/channel chats: 20 requests / minute;
* - edit methods: 6 requests / second.
*
* Telegram can still return 429 for dynamic flood limits. In that case the
* queue always honors `parameters.retry_after` and requeues the task.
*/
export type TelegramChatId = number | string;
export type TelegramChatType = string;
export type TelegramApiTaskContext = {
attempt: number;
signal?: AbortSignal;
};
export type TelegramApiTask<T> = (context: TelegramApiTaskContext) => Promise<T>;
export type RateLimitConfig = {
maxRequests: number;
intervalMs: number;
};
export type TelegramApiQueueTaskOptions = {
chatId?: TelegramChatId;
chatType?: TelegramChatType;
method?: string;
priority?: number;
maxAttempts?: number;
signal?: AbortSignal;
skipPerChatLimit?: boolean;
};
export type TelegramApiRetryEvent = {
taskId: number;
method?: string;
chatId?: TelegramChatId;
attempt: number;
delayMs: number;
reason: "telegram_retry_after" | "transient_error";
error: unknown;
};
export type TelegramApiQueueOptions = {
globalLimit?: Partial<RateLimitConfig>;
perChatLimit?: Partial<RateLimitConfig>;
groupChatLimit?: Partial<RateLimitConfig>;
editLimit?: Partial<RateLimitConfig>;
maxConcurrent?: number;
maxAttempts?: number;
baseRetryDelayMs?: number;
maxRetryDelayMs?: number;
retryJitterMs?: number;
retryAfterSafetyMs?: number;
maxQueueSize?: number;
onRetry?: (event: TelegramApiRetryEvent) => void;
};
export type TelegramApiQueueStats = {
queued: number;
running: number;
closed: boolean;
};
type RetryDecision = {
delayMs: number;
reason: TelegramApiRetryEvent["reason"];
};
type QueueEntryState = "queued" | "running" | "settled" | "cancelled";
type QueueEntry<T> = {
id: number;
sequence: number;
task: TelegramApiTask<T>;
options: TelegramApiQueueTaskOptions;
attempt: number;
notBefore: number;
state: QueueEntryState;
resolve: (value: T | PromiseLike<T>) => void;
reject: (reason?: unknown) => void;
abortHandler?: () => void;
};
type ResolvedTelegramApiQueueOptions = {
globalLimit: RateLimitConfig;
perChatLimit: RateLimitConfig;
groupChatLimit: RateLimitConfig;
editLimit: RateLimitConfig;
maxConcurrent: number;
maxAttempts: number;
baseRetryDelayMs: number;
maxRetryDelayMs: number;
retryJitterMs: number;
retryAfterSafetyMs: number;
maxQueueSize: number;
onRetry?: (event: TelegramApiRetryEvent) => void;
};
const DEFAULT_OPTIONS: ResolvedTelegramApiQueueOptions = {
globalLimit: {maxRequests: 30, intervalMs: 1000},
perChatLimit: {maxRequests: 1, intervalMs: 1000},
groupChatLimit: {maxRequests: 20, intervalMs: 60_000},
editLimit: {maxRequests: 6, intervalMs: 1000},
maxConcurrent: 8,
maxAttempts: 5,
baseRetryDelayMs: 500,
maxRetryDelayMs: 30_000,
retryJitterMs: 250,
retryAfterSafetyMs: 250,
maxQueueSize: 10_000,
};
function mergeLimitConfig(base: RateLimitConfig, override?: Partial<RateLimitConfig>): RateLimitConfig {
return {
maxRequests: override?.maxRequests ?? base.maxRequests,
intervalMs: override?.intervalMs ?? base.intervalMs,
};
}
function resolveOptions(options: TelegramApiQueueOptions): ResolvedTelegramApiQueueOptions {
return {
globalLimit: mergeLimitConfig(DEFAULT_OPTIONS.globalLimit, options.globalLimit),
perChatLimit: mergeLimitConfig(DEFAULT_OPTIONS.perChatLimit, options.perChatLimit),
groupChatLimit: mergeLimitConfig(DEFAULT_OPTIONS.groupChatLimit, options.groupChatLimit),
editLimit: mergeLimitConfig(DEFAULT_OPTIONS.editLimit, options.editLimit),
maxConcurrent: options.maxConcurrent ?? DEFAULT_OPTIONS.maxConcurrent,
maxAttempts: options.maxAttempts ?? DEFAULT_OPTIONS.maxAttempts,
baseRetryDelayMs: options.baseRetryDelayMs ?? DEFAULT_OPTIONS.baseRetryDelayMs,
maxRetryDelayMs: options.maxRetryDelayMs ?? DEFAULT_OPTIONS.maxRetryDelayMs,
retryJitterMs: options.retryJitterMs ?? DEFAULT_OPTIONS.retryJitterMs,
retryAfterSafetyMs: options.retryAfterSafetyMs ?? DEFAULT_OPTIONS.retryAfterSafetyMs,
maxQueueSize: options.maxQueueSize ?? DEFAULT_OPTIONS.maxQueueSize,
onRetry: options.onRetry,
};
}
function createAbortError(): Error {
const error = new Error("Telegram API queue task aborted");
error.name = "AbortError";
return error;
}
function createClosedError(): Error {
return new Error("Telegram API queue is closed");
}
function createQueueOverflowError(maxQueueSize: number): Error {
return new Error(`Telegram API queue overflow: maxQueueSize=${maxQueueSize}`);
}
function delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null;
}
function readPath(source: unknown, pathParts: readonly string[]): unknown {
let current = source;
for (const part of pathParts) {
if (!isRecord(current)) return undefined;
current = current[part];
}
return current;
}
function readNumber(source: unknown, paths: readonly (readonly string[])[]): number | undefined {
for (const pathParts of paths) {
const value = readPath(source, pathParts);
if (typeof value === "number" && Number.isFinite(value)) return value;
if (typeof value === "string") {
const parsed = Number(value);
if (Number.isFinite(parsed)) return parsed;
}
}
return undefined;
}
function readString(source: unknown, paths: readonly (readonly string[])[]): string | undefined {
for (const pathParts of paths) {
const value = readPath(source, pathParts);
if (typeof value === "string") return value;
}
return undefined;
}
function extractRetryAfterMs(error: unknown, safetyMs: number): number | undefined {
const retryAfterSeconds = readNumber(error, [
["parameters", "retry_after"],
["response", "parameters", "retry_after"],
["response", "body", "parameters", "retry_after"],
["body", "parameters", "retry_after"],
]);
if (retryAfterSeconds === undefined) return undefined;
return Math.max(0, Math.ceil(retryAfterSeconds * 1000) + safetyMs);
}
function extractStatusCode(error: unknown): number | undefined {
return readNumber(error, [
["error_code"],
["errorCode"],
["status"],
["statusCode"],
["response", "error_code"],
["response", "status"],
["response", "statusCode"],
["response", "body", "error_code"],
["body", "error_code"],
]);
}
function extractErrorCode(error: unknown): string | undefined {
return readString(error, [
["code"],
["errno"],
["cause", "code"],
]);
}
function extractErrorMessage(error: unknown): string {
if (error instanceof Error) return error.message;
if (typeof error === "string") return error;
const message = readString(error, [
["message"],
["description"],
["response", "description"],
["response", "body", "description"],
["body", "description"],
]);
return message ?? "";
}
function isTelegramTooManyRequests(error: unknown): boolean {
return extractStatusCode(error) === 429 || /too many requests|retry after/i.test(extractErrorMessage(error));
}
function isTransientError(error: unknown): boolean {
const statusCode = extractStatusCode(error);
if (statusCode !== undefined) {
if (statusCode === 408) return true;
if (statusCode >= 500 && statusCode <= 599) return true;
if (statusCode >= 400 && statusCode <= 499) return false;
}
const code = extractErrorCode(error);
if (code && ["ETIMEDOUT", "ECONNRESET", "ECONNABORTED", "EAI_AGAIN", "ENOTFOUND", "EPIPE"].includes(code)) {
return true;
}
return /timeout|socket hang up|network error|econnreset|econnaborted|eai_again/i.test(extractErrorMessage(error));
}
function isLikelyGroupChatId(chatId: TelegramChatId | undefined): boolean {
if (typeof chatId === "number") return chatId < 0;
if (typeof chatId === "string") return chatId.startsWith("-");
return false;
}
function isGroupLikeChat(chatType: TelegramChatType | undefined, chatId: TelegramChatId | undefined): boolean {
if (chatType === "group" || chatType === "supergroup" || chatType === "channel") return true;
if (chatType === "private") return false;
return isLikelyGroupChatId(chatId);
}
function isEditMethod(method: string | undefined): boolean {
return !!method && method.toLowerCase().startsWith("edit");
}
function normalizeBucketKey(value: TelegramChatId): string {
return String(value);
}
class SlidingWindowRateLimit {
private timestamps: number[] = [];
private pausedUntil = 0;
private lastTouched = Date.now();
constructor(private readonly config: RateLimitConfig) {
}
nextDelay(now: number): number {
this.lastTouched = now;
this.prune(now);
const pauseDelay = Math.max(0, this.pausedUntil - now);
if (pauseDelay > 0) return pauseDelay;
if (this.timestamps.length < this.config.maxRequests) return 0;
const oldest = this.timestamps[0] ?? now;
return Math.max(0, oldest + this.config.intervalMs - now);
}
record(now: number): void {
this.lastTouched = now;
this.prune(now);
this.timestamps.push(now);
}
pause(delayMs: number, now: number): void {
this.lastTouched = now;
this.pausedUntil = Math.max(this.pausedUntil, now + delayMs);
}
isIdle(now: number, idleMs: number): boolean {
this.prune(now);
return this.timestamps.length === 0
&& this.pausedUntil <= now
&& now - this.lastTouched >= idleMs;
}
private prune(now: number): void {
const minTime = now - this.config.intervalMs;
while (this.timestamps.length && (this.timestamps[0] ?? now) <= minTime) {
this.timestamps.shift();
}
}
}
export class TelegramApiQueue {
private readonly options: ResolvedTelegramApiQueueOptions;
private readonly globalBucket: SlidingWindowRateLimit;
private readonly editBucket: SlidingWindowRateLimit;
private readonly chatBuckets = new Map<string, SlidingWindowRateLimit>();
private readonly groupChatBuckets = new Map<string, SlidingWindowRateLimit>();
private readonly idleResolvers: Array<() => void> = [];
private readonly bucketIdleMs: number;
private queue: Array<QueueEntry<unknown>> = [];
private timer: NodeJS.Timeout | null = null;
private running = 0;
private nextId = 1;
private nextSequence = 1;
private closed = false;
constructor(options: TelegramApiQueueOptions = {}) {
this.options = resolveOptions(options);
this.globalBucket = new SlidingWindowRateLimit(this.options.globalLimit);
this.editBucket = new SlidingWindowRateLimit(this.options.editLimit);
this.bucketIdleMs = Math.max(this.options.perChatLimit.intervalMs, this.options.groupChatLimit.intervalMs) * 2;
}
get stats(): TelegramApiQueueStats {
return {
queued: this.queue.length,
running: this.running,
closed: this.closed,
};
}
enqueue<T>(task: TelegramApiTask<T>, options: TelegramApiQueueTaskOptions = {}): Promise<T> {
if (this.closed) return Promise.reject(createClosedError());
if (this.queue.length >= this.options.maxQueueSize) return Promise.reject(createQueueOverflowError(this.options.maxQueueSize));
if (options.signal?.aborted) return Promise.reject(createAbortError());
return new Promise<T>((resolve, reject) => {
const entry: QueueEntry<T> = {
id: this.nextId++,
sequence: this.nextSequence++,
task,
options,
attempt: 1,
notBefore: Date.now(),
state: "queued",
resolve,
reject,
};
this.attachAbortHandler(entry);
this.insertEntry(entry as QueueEntry<unknown>);
this.pump();
});
}
waitUntilIdle(): Promise<void> {
if (this.queue.length === 0 && this.running === 0) return Promise.resolve();
return new Promise(resolve => {
this.idleResolvers.push(resolve);
});
}
close(reason: unknown = createClosedError()): void {
this.closed = true;
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
const queued = this.queue;
this.queue = [];
for (const entry of queued) {
this.cleanupAbortHandler(entry);
entry.state = "cancelled";
entry.reject(reason);
}
this.chatBuckets.clear();
this.groupChatBuckets.clear();
this.resolveIdleIfNeeded();
}
clear(reason: unknown = new Error("Telegram API queue was cleared")): void {
const queued = this.queue;
this.queue = [];
for (const entry of queued) {
this.cleanupAbortHandler(entry);
entry.state = "cancelled";
entry.reject(reason);
}
this.resolveIdleIfNeeded();
}
private insertEntry(entry: QueueEntry<unknown>): void {
this.queue.push(entry);
this.queue.sort((left, right) => {
const priorityDiff = (right.options.priority ?? 0) - (left.options.priority ?? 0);
return priorityDiff || left.sequence - right.sequence;
});
}
private abortQueuedEntry(taskId: number): void {
const index = this.queue.findIndex(entry => entry.id === taskId);
if (index < 0) return;
const entry = this.queue.splice(index, 1)[0];
if (!entry) return;
this.cleanupAbortHandler(entry);
entry.state = "cancelled";
entry.reject(createAbortError());
this.resolveIdleIfNeeded();
}
private pump(): void {
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
if (this.closed) return;
this.cleanupIdleBuckets();
while (this.running < this.options.maxConcurrent) {
const selection = this.selectNextEntry(Date.now());
if (!selection) {
this.resolveIdleIfNeeded();
return;
}
if (selection.delayMs > 0) {
this.schedule(selection.delayMs);
return;
}
const entry = this.queue.splice(selection.index, 1)[0];
if (!entry) continue;
this.startEntry(entry);
}
}
private selectNextEntry(now: number): { index: number; delayMs: number } | null {
let bestBlockedIndex = -1;
let bestBlockedDelay = Number.POSITIVE_INFINITY;
for (let index = 0; index < this.queue.length; index++) {
const entry = this.queue[index];
if (!entry) continue;
if (entry.options.signal?.aborted) {
this.abortQueuedEntry(entry.id);
index--;
continue;
}
const delayMs = this.nextDelayFor(entry, now);
if (delayMs === 0) return {index, delayMs};
if (delayMs < bestBlockedDelay) {
bestBlockedDelay = delayMs;
bestBlockedIndex = index;
}
}
if (bestBlockedIndex < 0) return null;
return {index: bestBlockedIndex, delayMs: bestBlockedDelay};
}
private startEntry(entry: QueueEntry<unknown>): void {
entry.state = "running";
this.cleanupAbortHandler(entry);
this.recordStart(entry, Date.now());
this.running++;
void this.runEntry(entry);
}
private async runEntry(entry: QueueEntry<unknown>): Promise<void> {
try {
if (entry.options.signal?.aborted) throw createAbortError();
const result = await entry.task({
attempt: entry.attempt,
signal: entry.options.signal,
});
entry.state = "settled";
entry.resolve(result);
} catch (error) {
const retry = this.getRetryDecision(error, entry);
if (retry && !this.closed) {
this.applyRetryPause(entry, retry);
entry.attempt++;
entry.notBefore = Date.now() + retry.delayMs;
entry.state = "queued";
if (entry.options.signal?.aborted) {
entry.state = "cancelled";
entry.reject(createAbortError());
} else {
this.attachAbortHandler(entry);
this.insertEntry(entry);
this.options.onRetry?.({
taskId: entry.id,
method: entry.options.method,
chatId: entry.options.chatId,
attempt: entry.attempt - 1,
delayMs: retry.delayMs,
reason: retry.reason,
error,
});
}
} else {
entry.state = "settled";
entry.reject(this.closed ? createClosedError() : error);
}
} finally {
this.running--;
this.pump();
}
}
private nextDelayFor(entry: QueueEntry<unknown>, now: number): number {
const explicitDelay = Math.max(0, entry.notBefore - now);
const bucketDelay = this.bucketsFor(entry).reduce((maxDelay, bucket) => {
return Math.max(maxDelay, bucket.nextDelay(now));
}, 0);
return Math.max(explicitDelay, bucketDelay);
}
private recordStart(entry: QueueEntry<unknown>, now: number): void {
for (const bucket of this.bucketsFor(entry)) {
bucket.record(now);
}
}
private bucketsFor(entry: QueueEntry<unknown>): SlidingWindowRateLimit[] {
const buckets = [this.globalBucket];
const chatId = entry.options.chatId;
if (chatId !== undefined && !entry.options.skipPerChatLimit) {
buckets.push(this.getChatBucket(chatId));
if (isGroupLikeChat(entry.options.chatType, chatId)) {
buckets.push(this.getGroupChatBucket(chatId));
}
}
if (isEditMethod(entry.options.method)) {
buckets.push(this.editBucket);
}
return buckets;
}
private getChatBucket(chatId: TelegramChatId): SlidingWindowRateLimit {
const key = normalizeBucketKey(chatId);
let bucket = this.chatBuckets.get(key);
if (!bucket) {
bucket = new SlidingWindowRateLimit(this.options.perChatLimit);
this.chatBuckets.set(key, bucket);
}
return bucket;
}
private getGroupChatBucket(chatId: TelegramChatId): SlidingWindowRateLimit {
const key = normalizeBucketKey(chatId);
let bucket = this.groupChatBuckets.get(key);
if (!bucket) {
bucket = new SlidingWindowRateLimit(this.options.groupChatLimit);
this.groupChatBuckets.set(key, bucket);
}
return bucket;
}
private getRetryDecision(error: unknown, entry: QueueEntry<unknown>): RetryDecision | null {
if (entry.options.signal?.aborted) return null;
const maxAttempts = entry.options.maxAttempts ?? this.options.maxAttempts;
if (entry.attempt >= maxAttempts) return null;
const retryAfterMs = extractRetryAfterMs(error, this.options.retryAfterSafetyMs);
if (retryAfterMs !== undefined || isTelegramTooManyRequests(error)) {
return {
delayMs: retryAfterMs ?? this.backoffDelay(entry.attempt),
reason: "telegram_retry_after",
};
}
if (!isTransientError(error)) return null;
return {
delayMs: this.backoffDelay(entry.attempt),
reason: "transient_error",
};
}
private backoffDelay(attempt: number): number {
const exponential = this.options.baseRetryDelayMs * (2 ** Math.max(0, attempt - 1));
const capped = Math.min(this.options.maxRetryDelayMs, exponential);
const jitter = this.options.retryJitterMs > 0 ? Math.floor(Math.random() * this.options.retryJitterMs) : 0;
return capped + jitter;
}
private applyRetryPause(entry: QueueEntry<unknown>, retry: RetryDecision): void {
if (retry.reason !== "telegram_retry_after") return;
const now = Date.now();
for (const bucket of this.bucketsFor(entry)) {
bucket.pause(retry.delayMs, now);
}
}
private schedule(delayMs: number): void {
const safeDelay = Math.max(0, Math.min(delayMs, 2_147_483_647));
this.timer = setTimeout(() => {
this.timer = null;
this.pump();
}, safeDelay);
}
private attachAbortHandler<T>(entry: QueueEntry<T>): void {
if (!entry.options.signal || entry.abortHandler) return;
entry.abortHandler = () => this.abortQueuedEntry(entry.id);
entry.options.signal.addEventListener("abort", entry.abortHandler, {once: true});
}
private cleanupAbortHandler<T>(entry: QueueEntry<T>): void {
if (!entry.abortHandler) return;
entry.options.signal?.removeEventListener("abort", entry.abortHandler);
entry.abortHandler = undefined;
}
private resolveIdleIfNeeded(): void {
if (this.queue.length !== 0 || this.running !== 0) return;
this.cleanupIdleBuckets();
const resolvers = this.idleResolvers.splice(0);
for (const resolve of resolvers) {
resolve();
}
}
private cleanupIdleBuckets(now = Date.now()): void {
for (const [key, bucket] of this.chatBuckets) {
if (bucket.isIdle(now, this.bucketIdleMs)) {
this.chatBuckets.delete(key);
}
}
for (const [key, bucket] of this.groupChatBuckets) {
if (bucket.isIdle(now, this.bucketIdleMs)) {
this.groupChatBuckets.delete(key);
}
}
}
}
export const telegramApiQueue = new TelegramApiQueue();
export async function enqueueTelegramApi<T>(
task: TelegramApiTask<T>,
options?: TelegramApiQueueTaskOptions
): Promise<T> {
return telegramApiQueue.enqueue(task, options);
}
export async function enqueueTelegramApiCall<T>(
task: () => Promise<T>,
options?: TelegramApiQueueTaskOptions
): Promise<T> {
return telegramApiQueue.enqueue(() => task(), options);
}
export async function sleepForTelegramRetry(ms: number): Promise<void> {
await delay(ms);
}
+1372 -1131
View File
File diff suppressed because it is too large Load Diff