Skip to content

Commit

Permalink
Refactor jobs to use threads & update vulnerable dependencies (#44)
Browse files Browse the repository at this point in the history
* Update dependencies (vulnerabilities)

* Refactor jobs code, remove bullmq

* fix worker thread & remove bullmq

* lint

* fix lint

* quick rename variables for clarity
  • Loading branch information
mhd-hi authored Nov 1, 2024
1 parent 27ee8f5 commit 65ff3df
Show file tree
Hide file tree
Showing 20 changed files with 1,402 additions and 2,681 deletions.
5 changes: 1 addition & 4 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,4 @@ PORT=3000

DATABASE_URL="postgresql://postgres@localhost:5432/planifetsDB?schema=public"

LOG_LEVELS="log,error,warn,debug,verbose"

REDIS_HOST="localhost"
REDIS_PORT=6379
LOG_LEVELS="log,error,warn,debug" # Log levels: "log,error,warn,debug,fatal,verbose"
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* text=auto eol=lf
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"[typescript]": {
"editor.tabSize": 2,
"editor.formatOnSave": true,
"editor.defaultFormatter": "dbaeumer.vscode-eslint"
"editor.defaultFormatter": "rvest.vs-code-prettier-eslint"
},
"files.eol": "\n",
"typescript.preferences.importModuleSpecifier": "relative",
Expand Down
21 changes: 8 additions & 13 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,31 +31,27 @@
"seed": "ts-node --compiler-options {\"module\":\"commonjs\"} prisma/seeds/seed.ts"
},
"dependencies": {
"@bull-board/api": "^5.21.4",
"@bull-board/express": "^5.21.4",
"@bull-board/ui": "^5.21.4",
"@nestjs/axios": "^3.0.1",
"@nestjs/bullmq": "^10.2.0",
"@nestjs/common": "^10.0.0",
"@nestjs/config": "^3.2.1",
"@nestjs/core": "^10.0.0",
"@nestjs/core": "^10.4.5",
"@nestjs/mapped-types": "^2.0.5",
"@nestjs/platform-express": "^10.4.1",
"@nestjs/schedule": "^4.1.0",
"@nestjs/swagger": "^7.4.0",
"@nestjs/swagger": "^7.4.2",
"@prisma/client": "^5.15.0",
"@types/unzipper": "^0.10.10",
"axios": "^1.7.4",
"bullmq": "^5.12.9",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.0",
"express": "^4.19.2",
"express": "^4.21.1",
"pdf2json": "^3.0.5",
"planifets-backend": "file:",
"reflect-metadata": "^0.1.13",
"rxjs": "^7.8.1",
"unzipper": "^0.12.3",
"uuid": "^9.0.1",
"webpack": "^5.0.0"
"webpack": "^5.95.0"
},
"devDependencies": {
"@nestjs/cli": "^10.0.0",
Expand All @@ -66,7 +62,7 @@
"@types/express": "^4.17.21",
"@types/jest": "^29.5.2",
"@types/multer": "^1.4.11",
"@types/node": "^20.3.1",
"@types/node": "^22.8.1",
"@types/supertest": "^2.0.12",
"@typescript-eslint/eslint-plugin": "^7.5.0",
"@typescript-eslint/parser": "^7.5.0",
Expand All @@ -80,15 +76,14 @@
"prettier": "^3.2.5",
"prettier-eslint": "^16.3.0",
"prisma": "^5.7.1",
"prisma-erd-generator": "^1.11.2",
"source-map-support": "^0.5.21",
"supertest": "^6.3.3",
"ts-jest": "^29.1.0",
"ts-loader": "^9.4.3",
"ts-node": "^10.9.2",
"tsconfig-paths": "^4.2.0",
"typescript": "^5.3.3",
"typescript-eslint": "^7.1.0"
"typescript-eslint": "^8.9.0"
},
"jest": {
"moduleFileExtensions": [
Expand All @@ -112,4 +107,4 @@
"coverageDirectory": "./coverage",
"testEnvironment": "node"
}
}
}
5 changes: 0 additions & 5 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@ generator client {
provider = "prisma-client-js"
}

generator erd {
provider = "prisma-erd-generator"
theme = "neutral"
}

datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
Expand Down
12 changes: 0 additions & 12 deletions src/app.controller.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,13 @@
import { Controller, Get } from '@nestjs/common';
import { ApiOperation } from '@nestjs/swagger';

import { QueuesService } from './jobs/queues.service';

@Controller()
export class AppController {
constructor(private readonly queuesService: QueuesService) {}
@Get()
@ApiOperation({
summary: 'Hello World',
})
public getHello(): string {
return 'Hello World!';
}

//For test purposes
@Get('process')
@ApiOperation({
summary: 'Trigger the queue to process jobs',
})
public async triggerProcessPrograms() {
await this.queuesService.processJobs();
}
}
24 changes: 7 additions & 17 deletions src/app.module.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { HttpModule } from '@nestjs/axios';
import { BullModule } from '@nestjs/bullmq';
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';

Expand All @@ -11,10 +10,10 @@ import config from './config/configuration';
import { CourseModule } from './course/course.module';
import { CourseInstanceModule } from './course-instance/course-instance.module';
import { CoursePrerequisiteModule } from './course-prerequisite/course-prerequisite.module';
import { CoursesProcessor } from './jobs/processors/courses.processor';
import { ProgramsProcessor } from './jobs/processors/programs.processor';
import { QueuesEnum } from './jobs/queues.enum';
import { QueuesService } from './jobs/queues.service';
import { JobsModule } from './jobs/jobs.module';
import { JobsService } from './jobs/jobs.service';
import { CoursesJobService } from './jobs/workers/courses.worker';
import { ProgramsJobService } from './jobs/workers/programs.worker';
import { PrismaModule } from './prisma/prisma.module';
import { ProgramModule } from './program/program.module';
import { ProgramCourseModule } from './program-course/program-course.module';
Expand All @@ -27,21 +26,12 @@ import { SessionModule } from './session/session.module';
load: [config],
envFilePath: '.env',
}),
BullModule.forRoot({
connection: {
host: config().redis.host,
port: config().redis.port,
},
}),
BullModule.registerQueue(
{ name: QueuesEnum.PROGRAMS },
{ name: QueuesEnum.COURSES },
),
HttpModule,
PrismaModule,
CheminotModule,
EtsModule,
PdfModule,
JobsModule,

CourseModule,
CourseInstanceModule,
Expand All @@ -50,8 +40,8 @@ import { SessionModule } from './session/session.module';
ProgramModule,
ProgramCourseModule,
],
providers: [ProgramsProcessor, CoursesProcessor, QueuesService],
providers: [ProgramsJobService, CoursesJobService, JobsService],
controllers: [AppController],
exports: [HttpModule, QueuesService],
exports: [HttpModule, JobsService],
})
export class AppModule {}
14 changes: 14 additions & 0 deletions src/jobs/jobs.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Controller, Get } from '@nestjs/common';

import { JobsService } from './jobs.service';

@Controller('jobs')
export class JobsController {
constructor(private readonly jobsService: JobsService) {}

@Get('run-workers')
public async runWorkers() {
await this.jobsService.processJobs();
return { status: 'Jobs have been triggered' };
}
}
26 changes: 26 additions & 0 deletions src/jobs/jobs.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Module } from '@nestjs/common';
import { ScheduleModule } from '@nestjs/schedule';

import { CheminotModule } from '../common/api-helper/cheminot/cheminot.module';
import { EtsModule } from '../common/api-helper/ets/ets.module';
import { CourseModule } from '../course/course.module';
import { ProgramModule } from '../program/program.module';
import { ProgramCourseModule } from '../program-course/program-course.module';
import { JobsController } from './jobs.controller';
import { JobsService } from './jobs.service';
import { CoursesJobService } from './workers/courses.worker';
import { ProgramsJobService } from './workers/programs.worker';

@Module({
imports: [
ScheduleModule.forRoot(),
EtsModule,
ProgramModule,
CourseModule,
ProgramCourseModule,
CheminotModule,
],
providers: [JobsService, ProgramsJobService, CoursesJobService],
controllers: process.env.NODE_ENV === 'development' ? [JobsController] : [],
})
export class JobsModule {}
64 changes: 64 additions & 0 deletions src/jobs/jobs.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { join } from 'path';
import { isMainThread, Worker } from 'worker_threads';

@Injectable()
export class JobsService {
private readonly logger = new Logger(JobsService.name);

private checkMainThread() {
this.logger.debug(
'Are we on the main thread?',
isMainThread ? 'Yes' : 'No',
);
}

private runWorker<T>(serviceName: string, methodName: string): Promise<T> {
return new Promise<T>((resolve, reject) => {
const workerScript = join(__dirname, 'workers', 'jobRunner.worker.js');
const workerData = { serviceName, methodName };
const worker = new Worker(workerScript, { workerData });

worker.on('message', (message) => {
this.logger.verbose('Worker message:', message);
resolve(message);
});

worker.on('error', (error) => {
this.logger.error('Worker error:', error);

const rejectionError =
error instanceof Error ? error : new Error(String(error));
reject(rejectionError);
});

worker.on('exit', (code) => {
if (code !== 0) {
this.logger.error(`Worker stopped with exit code ${code}`);
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
});
}

@Cron(CronExpression.EVERY_1ST_DAY_OF_MONTH_AT_MIDNIGHT)
public async processJobs(): Promise<void> {
this.logger.log('Starting job processing...');
this.checkMainThread();

try {
await Promise.all([
this.runWorker('ProgramsJobService', 'processPrograms'),
this.runWorker('CoursesJobService', 'processCourses'),
this.runWorker(
'CoursesJobService',
'syncCourseDetailsWithCheminotData',
),
]);
this.logger.log('All jobs completed successfully!');
} catch (error) {
this.logger.error('Job processing error:', error);
}
}
}
Loading

0 comments on commit 65ff3df

Please sign in to comment.