import { failedRequestCounter, requestDurationHistogram, successfulRequestCounter, totalRequestCounter } from "./lib/metricsCounters"; import { coalesce } from "./lib/initTools"; import { createLogger } from "./lib/logger"; import { serializeError } from "./lib/serializeError"; import { doWork } from "./emailWorker"; /** time between two pull operations */ const PULL_INTERVAL = parseInt(coalesce(process.env.PULL_INTERVAL, "10000")); const consoleLog = createLogger("server:server"); /** Writes entry to log */ const logWrite = (logTitle:string, logMessage:string) => { consoleLog(`${logTitle}: ${logMessage}}`); } /** Writes error to log */ const logError = (ex: any) => logWrite(serializeError(ex), "error"); /** * zastavica za zaustavljanje sinhronizacije */ let disposed:boolean = false; /** is worker started - prevents multiple starts */ let workerStarted:boolean = false; /** Promise which is resolved once the pending work in progress is completed */ let pendingWork:Promise|undefined; /** Worker re-run timeout */ let pendingTimeout:NodeJS.Timeout|undefined; /** Enumeracija pojedinih statusa obrade jednog work-a */ export enum WorkerRunnerStatus { init="init", disposed="disposed", beginWork="beginWork", updatedStats1="updatedStats1", updatedStats2="updatedStats2", stoppedStatTimer="stoppedStatTimer", workDone="workDone", newIntervalScheduled="newIntervalScheduled", currentWorkResolved="currentWorkResolved", } /** Info o statusu workera */ export type WorkerRunnerInfo = { /** zadnje izvršena readnja */ status: WorkerRunnerStatus, /** vrijeme kada je worker zadnji puta pokrenut */ lastWorkTime: number, } /** Info o statusu workera, koji koristi healthcheck kako bi vidio da li stvar funkcionira */ export const workerRunnerInfo:WorkerRunnerInfo = { status: WorkerRunnerStatus.init, lastWorkTime: Date.now() } export const workRunner = async () => { pendingTimeout = undefined; workerRunnerInfo.lastWorkTime = Date.now(); workerRunnerInfo.status = WorkerRunnerStatus.beginWork; // AKO je modul zaustavljen // -> nemoj se pokrenuti if(disposed) { workerRunnerInfo.status = WorkerRunnerStatus.disposed; return; } // kreiram Promise koji omogućuje da dispose zna // pričekati da worker završi sa poslom (ako je u tom trenutku aktivan) pendingWork = new Promise(async (resolve) => { try { totalRequestCounter.inc(); const stopPrometheusTimer = requestDurationHistogram.startTimer(); workerRunnerInfo.status = WorkerRunnerStatus.updatedStats1; try { // ne dopuštam da stvar sruši worker await doWork(); workerRunnerInfo.status = WorkerRunnerStatus.workDone; // ažuriram statistiku successfulRequestCounter.inc(); workerRunnerInfo.status = WorkerRunnerStatus.updatedStats2; } catch(ex:any) { // ažuriram statistiku failedRequestCounter.inc(); logError(ex); } stopPrometheusTimer(); workerRunnerInfo.status = WorkerRunnerStatus.stoppedStatTimer; } catch(ex:any) { logError(ex); } // nemoj pokrenuti timer ako je worker u međuvremenu disposed if(!disposed) { // pull again after timeout pendingTimeout = setTimeout(workRunner, PULL_INTERVAL); workerRunnerInfo.status = WorkerRunnerStatus.newIntervalScheduled; } else { logWrite("Info", "... exiting worker loop"); } resolve(); workerRunnerInfo.status = WorkerRunnerStatus.currentWorkResolved; pendingWork = undefined; }); // this is an async function which must return a promise // > so return the promise which will be resolved once the work is done return(pendingWork); }; /** * Starts the worker */ export const startSyncWorker = () => { if(!workerStarted && !disposed) { workerStarted = true; workRunner(); logWrite("Info", "Worker Started"); }; } /** * Stops and disposes the worker */ export const disposeSyncWorker = async () => { logWrite("Info", "Disposing worker ..."); disposed = true; // preventing timer from trigger another work cycle if(pendingTimeout) { clearTimeout(pendingTimeout); } // IF no work is currently in progress // > return a resolved promise if(!pendingWork) { return(Promise.resolve()); } await pendingWork; logWrite("Info", "Worker disposed!"); } /** Ovo se koristi samo za Unit Testing */ export const reset_dispose = () => { disposed = false; }