shitton
This commit is contained in:
@@ -8,6 +8,7 @@ import {StoredAttachment, StoredAttachmentKind} from "../model/stored-attachment
|
||||
import {performFFmpeg} from "../util/ffmpeg";
|
||||
import ffmpeg from "fluent-ffmpeg";
|
||||
import {AsyncSemaphore, KeyedAsyncLock} from "../util/async-lock";
|
||||
import {appLogger} from "../logging/logger";
|
||||
|
||||
export type AiDownloadedFile = {
|
||||
kind: StoredAttachmentKind;
|
||||
@@ -20,6 +21,7 @@ export type AiDownloadedFile = {
|
||||
|
||||
const cachePathLocks = new KeyedAsyncLock();
|
||||
const ffmpegSemaphore = new AsyncSemaphore(2);
|
||||
const logger = appLogger.child("attachments");
|
||||
|
||||
function safeFileName(value: string): string {
|
||||
return value.replace(/[\\/:*?"<>|\u0000-\u001F]/g, "_").slice(0, 180);
|
||||
@@ -90,31 +92,48 @@ function cachePathFor(kind: StoredAttachmentKind, fileUniqueId: string | undefin
|
||||
}
|
||||
|
||||
async function downloadToCache(kind: StoredAttachmentKind, fileId: string, fileName: string, mimeType?: string, fileUniqueId?: string): Promise<StoredAttachment | null> {
|
||||
const startedAt = Date.now();
|
||||
logger.debug("download.start", {kind, fileId, fileName, mimeType});
|
||||
const file = await bot.getFile({file_id: fileId});
|
||||
const finalFileName = fileNameWithExtension(fileName, mimeType, file.file_path);
|
||||
const location = cachePathFor(kind, fileUniqueId, fileId, finalFileName);
|
||||
|
||||
await cachePathLocks.runExclusive(location, async () => {
|
||||
if (fs.existsSync(location)) return;
|
||||
if (fs.existsSync(location)) {
|
||||
logger.trace("download.cache_hit", {kind, location});
|
||||
return;
|
||||
}
|
||||
|
||||
const buffer = await downloadTelegramFile(file.file_path);
|
||||
if (!buffer) return;
|
||||
if (!buffer) {
|
||||
logger.warn("download.empty", {kind, fileId, telegramFilePath: file.file_path});
|
||||
return;
|
||||
}
|
||||
|
||||
const tempLocation = `${location}.${process.pid}.${Date.now()}.tmp`;
|
||||
fs.mkdirSync(path.dirname(location), {recursive: true});
|
||||
fs.writeFileSync(tempLocation, buffer);
|
||||
fs.renameSync(tempLocation, location);
|
||||
logger.debug("download.saved", {kind, location, bytes: buffer.length, duration: logger.duration(startedAt)});
|
||||
});
|
||||
|
||||
return {kind, fileId, fileUniqueId, fileName: finalFileName, mimeType, cachePath: location};
|
||||
}
|
||||
|
||||
async function convertAudioToWav(input: string, output: string, noVideo = false): Promise<void> {
|
||||
const startedAt = Date.now();
|
||||
logger.debug("audio.convert.start", {input, output, noVideo});
|
||||
await cachePathLocks.runExclusive(output, async () => {
|
||||
if (fs.existsSync(output)) return;
|
||||
if (fs.existsSync(output)) {
|
||||
logger.trace("audio.convert.cache_hit", {output});
|
||||
return;
|
||||
}
|
||||
|
||||
await ffmpegSemaphore.runExclusive(async () => {
|
||||
if (fs.existsSync(output)) return;
|
||||
if (fs.existsSync(output)) {
|
||||
logger.trace("audio.convert.cache_hit", {output});
|
||||
return;
|
||||
}
|
||||
|
||||
const tempOutput = `${output}.${process.pid}.${Date.now()}.tmp.wav`;
|
||||
try {
|
||||
@@ -125,14 +144,16 @@ async function convertAudioToWav(input: string, output: string, noVideo = false)
|
||||
.toFormat("wav")
|
||||
.save(tempOutput)
|
||||
.on("progress", (progress) => {
|
||||
console.log("progress", progress);
|
||||
logger.trace("audio.convert.progress", {input, output, progress});
|
||||
});
|
||||
});
|
||||
fs.renameSync(tempOutput, output);
|
||||
logger.debug("audio.convert.done", {input, output, duration: logger.duration(startedAt)});
|
||||
} catch (e) {
|
||||
if (fs.existsSync(tempOutput)) {
|
||||
fs.rmSync(tempOutput, {force: true});
|
||||
}
|
||||
logger.error("audio.convert.failed", {input, output, error: e});
|
||||
throw e;
|
||||
}
|
||||
});
|
||||
@@ -140,7 +161,9 @@ async function convertAudioToWav(input: string, output: string, noVideo = false)
|
||||
}
|
||||
|
||||
export async function cacheMessageAttachments(msg: Message): Promise<StoredAttachment[]> {
|
||||
const startedAt = Date.now();
|
||||
const result: StoredAttachment[] = [];
|
||||
logger.debug("message.cache.start", {chatId: msg.chat?.id, messageId: msg.message_id});
|
||||
|
||||
try {
|
||||
if (msg.photo?.length) {
|
||||
@@ -202,10 +225,12 @@ export async function cacheMessageAttachments(msg: Message): Promise<StoredAttac
|
||||
logError(e);
|
||||
}
|
||||
|
||||
logger.debug("message.cache.done", {chatId: msg.chat?.id, messageId: msg.message_id, attachments: result.length, duration: logger.duration(startedAt)});
|
||||
return result;
|
||||
}
|
||||
|
||||
export function attachmentsToDownloadedFiles(attachments: StoredAttachment[]): AiDownloadedFile[] {
|
||||
logger.trace("downloaded_files.build", {attachments: attachments.length});
|
||||
return attachments
|
||||
.filter(attachment => fs.existsSync(attachment.cachePath))
|
||||
.map(attachment => ({
|
||||
@@ -219,6 +244,7 @@ export function attachmentsToDownloadedFiles(attachments: StoredAttachment[]): A
|
||||
}
|
||||
|
||||
export function cleanupDownloads(files: AiDownloadedFile[]): void {
|
||||
logger.trace("downloaded_files.cleanup", {files: files.length});
|
||||
// Files stay on disk in the message cache; drop in-memory buffers eagerly.
|
||||
for (const file of files) {
|
||||
file.buffer = Buffer.alloc(0);
|
||||
|
||||
Reference in New Issue
Block a user