refactor: rename email-server-worker to email-worker
Rename directory from email-server-worker to email-worker for clarity and brevity. Update all references in CLAUDE.md documentation.
This commit is contained in:
165
email-worker/src/workRunner.ts
Normal file
165
email-worker/src/workRunner.ts
Normal file
@@ -0,0 +1,165 @@
|
||||
import { failedRequestCounter, requestDurationHistogram, successfulRequestCounter, totalRequestCounter } from "./lib/metricsCounters";
|
||||
import { coalesce } from "./lib/initTools";
|
||||
import { createLogger } from "./lib/logger";
|
||||
import { serializeError } from "./lib/serializeError";
|
||||
import { doWork } from "./exampleWorker";
|
||||
|
||||
/** time between two pull operations */
|
||||
const PULL_INTERVAL = parseInt(coalesce(process.env.PULL_INTERVAL, "10000"));
|
||||
const consoleLog = createLogger("server:server");
|
||||
|
||||
/** Writes entry to log */
|
||||
const logWrite = (logTitle:string, logMessage:string) => {
|
||||
consoleLog(`${logTitle}: ${logMessage}}`);
|
||||
}
|
||||
|
||||
/** Writes error to log */
|
||||
const logError = (ex: any) =>
|
||||
logWrite(serializeError(ex), "error");
|
||||
|
||||
/**
|
||||
* zastavica za zaustavljanje sinhronizacije
|
||||
*/
|
||||
let disposed:boolean = false;
|
||||
/** is worker started - prevents multiple starts */
|
||||
let workerStarted:boolean = false;
|
||||
/** Promise which is resolved once the pending work in progress is completed */
|
||||
let pendingWork:Promise<void>|undefined;
|
||||
/** Worker re-run timeout */
|
||||
let pendingTimeout:NodeJS.Timeout|undefined;
|
||||
|
||||
/** Enumeracija pojedinih statusa obrade jednog work-a */
|
||||
export enum WorkerRunnerStatus {
|
||||
init="init",
|
||||
disposed="disposed",
|
||||
beginWork="beginWork",
|
||||
updatedStats1="updatedStats1",
|
||||
updatedStats2="updatedStats2",
|
||||
stoppedStatTimer="stoppedStatTimer",
|
||||
workDone="workDone",
|
||||
newIntervalScheduled="newIntervalScheduled",
|
||||
currentWorkResolved="currentWorkResolved",
|
||||
}
|
||||
|
||||
/** Info o statusu workera */
|
||||
export type WorkerRunnerInfo = {
|
||||
/** zadnje izvršena readnja */
|
||||
status: WorkerRunnerStatus,
|
||||
/** vrijeme kada je worker zadnji puta pokrenut */
|
||||
lastWorkTime: number,
|
||||
}
|
||||
|
||||
/** Info o statusu workera, koji koristi healthcheck kako bi vidio da li stvar funkcionira */
|
||||
export const workerRunnerInfo:WorkerRunnerInfo = {
|
||||
status: WorkerRunnerStatus.init,
|
||||
lastWorkTime: Date.now()
|
||||
}
|
||||
|
||||
export const workRunner = async () => {
|
||||
|
||||
pendingTimeout = undefined;
|
||||
workerRunnerInfo.lastWorkTime = Date.now();
|
||||
workerRunnerInfo.status = WorkerRunnerStatus.beginWork;
|
||||
|
||||
// AKO je modul zaustavljen
|
||||
// -> nemoj se pokrenuti
|
||||
if(disposed) {
|
||||
workerRunnerInfo.status = WorkerRunnerStatus.disposed;
|
||||
return;
|
||||
}
|
||||
|
||||
// kreiram Promise koji omogućuje da dispose zna
|
||||
// pričekati da worker završi sa poslom (ako je u tom trenutku aktivan)
|
||||
pendingWork = new Promise(async (resolve) => {
|
||||
|
||||
try {
|
||||
totalRequestCounter.inc();
|
||||
|
||||
const stopPrometheusTimer = requestDurationHistogram.startTimer();
|
||||
|
||||
workerRunnerInfo.status = WorkerRunnerStatus.updatedStats1;
|
||||
|
||||
try {
|
||||
// ne dopuštam da stvar sruši worker
|
||||
await doWork();
|
||||
|
||||
workerRunnerInfo.status = WorkerRunnerStatus.workDone;
|
||||
|
||||
// ažuriram statistiku
|
||||
successfulRequestCounter.inc();
|
||||
|
||||
workerRunnerInfo.status = WorkerRunnerStatus.updatedStats2;
|
||||
} catch(ex:any) {
|
||||
|
||||
// ažuriram statistiku
|
||||
failedRequestCounter.inc();
|
||||
logError(ex);
|
||||
}
|
||||
|
||||
stopPrometheusTimer();
|
||||
|
||||
workerRunnerInfo.status = WorkerRunnerStatus.stoppedStatTimer;
|
||||
} catch(ex:any) {
|
||||
logError(ex);
|
||||
}
|
||||
|
||||
// nemoj pokrenuti timer ako je worker u međuvremenu disposed
|
||||
if(!disposed) {
|
||||
// pull again after timeout
|
||||
pendingTimeout = setTimeout(workRunner, PULL_INTERVAL);
|
||||
workerRunnerInfo.status = WorkerRunnerStatus.newIntervalScheduled;
|
||||
} else {
|
||||
logWrite("Info", "... exiting worker loop");
|
||||
}
|
||||
|
||||
resolve();
|
||||
|
||||
workerRunnerInfo.status = WorkerRunnerStatus.currentWorkResolved;
|
||||
|
||||
pendingWork = undefined;
|
||||
});
|
||||
|
||||
// this is an async function which must return a promise
|
||||
// > so return the promise which will be resolved once the work is done
|
||||
return(pendingWork);
|
||||
};
|
||||
|
||||
/**
|
||||
* Starts the worker
|
||||
*/
|
||||
export const startSyncWorker = () => {
|
||||
if(!workerStarted && !disposed) {
|
||||
workerStarted = true;
|
||||
workRunner();
|
||||
logWrite("Info", "Worker Started");
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops and disposes the worker
|
||||
*/
|
||||
export const disposeSyncWorker = async () => {
|
||||
logWrite("Info", "Disposing worker ...");
|
||||
|
||||
disposed = true;
|
||||
|
||||
// preventing timer from trigger another work cycle
|
||||
if(pendingTimeout) {
|
||||
clearTimeout(pendingTimeout);
|
||||
}
|
||||
|
||||
// IF no work is currently in progress
|
||||
// > return a resolved promise
|
||||
if(!pendingWork) {
|
||||
return(Promise.resolve());
|
||||
}
|
||||
|
||||
await pendingWork;
|
||||
|
||||
logWrite("Info", "Worker disposed!");
|
||||
}
|
||||
|
||||
/** Ovo se koristi samo za Unit Testing */
|
||||
export const reset_dispose = () => {
|
||||
disposed = false;
|
||||
}
|
||||
Reference in New Issue
Block a user