From d29133e8f5d7e8d5bb7670f149884eea6e50dbf3 Mon Sep 17 00:00:00 2001 From: Srdjan S Date: Fri, 11 Apr 2025 17:23:29 +0200 Subject: [PATCH 01/24] Add round service --- bin/simple-subnet-api.js | 27 +++ lib/round-service.js | 260 +++++++++++++++++++++ migrations/003.do.checker-rounds.sql | 7 + migrations/004.do.checker-subnet-tasks.sql | 7 + test/round-service.test.js | 174 ++++++++++++++ 5 files changed, 475 insertions(+) create mode 100644 lib/round-service.js create mode 100644 migrations/003.do.checker-rounds.sql create mode 100644 migrations/004.do.checker-subnet-tasks.sql create mode 100644 test/round-service.test.js diff --git a/bin/simple-subnet-api.js b/bin/simple-subnet-api.js index e638191..563a676 100644 --- a/bin/simple-subnet-api.js +++ b/bin/simple-subnet-api.js @@ -1,6 +1,8 @@ import '../lib/instrument.js' import { createApp } from '../lib/app.js' import { DATABASE_URL, HOST, PORT, REQUEST_LOGGING, poolConfig } from '../lib/config.js' +import { RoundService } from '../lib/round-service.js' +import { createPgPool } from '../lib/pool.js' const app = createApp({ databaseUrl: DATABASE_URL, @@ -12,3 +14,28 @@ const app = createApp({ console.log('Starting the http server on host %j port %s', HOST, PORT) const serverUrl = await app.listen({ host: HOST, port: Number(PORT) }) console.log(serverUrl) + +const pool = await createPgPool(DATABASE_URL) +const roundService = new RoundService( + pool, + { + roundDurationMs: 20 * 60 * 1000, // 20 minutes + maxTasks: 100, + maxTasksPerNode: 10, + checkRoundIntervalMs: 1 * 60 * 1000 // 1 minute + } +); + +roundService.start(); + +process.on('SIGINT', async () => { + console.log('Stopping round service...'); + roundService.stop(); + process.exit(0); +}); + +process.on('SIGTERM', async () => { + console.log('Stopping round service...'); + roundService.stop(); + process.exit(0); +}); diff --git a/lib/round-service.js b/lib/round-service.js new file mode 100644 index 0000000..f1a52fc --- /dev/null +++ b/lib/round-service.js @@ -0,0 +1,260 @@ +/** @typedef {any} Task */ +/** @typedef {{id: number; start_time: string; end_time: string; max_tasks_per_node: number; }} Round */ +/** @typedef {{ roundDurationMs: number; maxTasks: number; maxTasksPerNode: number; checkRoundIntervalMs: number }} Config */ + +export class RoundService { + /** + * @type {Object} + * @description A mapping of subnet identifiers to their task sampler functions. + */ + #taskSamplers = {} + /** + * @type {Round | null} + */ + #currentRound = null + #isInitializing = false + /** + * @type {NodeJS.Timeout | null} + */ + #checkRoundIntervalId = null + #db + #config + + /** + * @param {import('./typings.js').PgPool} db + * @param {Config} config + */ + constructor (db, config) { + this.#db = db + this.#config = config + } + + /** + * Register a task sampler for a specific subnet + * @param {string} subnet - The subnet identifier + * @param {Function} samplerFn - Function that generates tasks for a subnet + */ + registerTaskSampler (subnet, samplerFn) { + if (typeof samplerFn !== 'function') { + throw new Error(`Task sampler for subnet ${subnet} must be a function`) + } + this.#taskSamplers[subnet] = samplerFn + console.log(`Task sampler registered for subnet: ${subnet}`) + } + + /** + * Start the round service + */ + async start () { + if (this.#isInitializing) return + this.#isInitializing = true + + try { + await this.#initializeRound() + this.#scheduleRoundCheck() + console.log(`Round service started. Round duration: ${this.#config.roundDurationMs / 60000} minutes`) + } catch (error) { + console.error('Failed to start round service:', error) + } finally { + this.#isInitializing = false + } + } + + /** + * Stop the round service + */ + stop () { + if (this.#checkRoundIntervalId) clearInterval(this.#checkRoundIntervalId) + console.log('Round service stopped') + } + + /** + * Initialize the current round + */ + async #initializeRound () { + const activeRound = await this.#getActiveRound() + + if (activeRound) { + this.#currentRound = activeRound + console.log(`Resuming active round #${activeRound.id}`) + } else { + await this.#startNewRound() + } + } + + /** + * Schedule periodic checks for round end + */ + #scheduleRoundCheck () { + this.#checkRoundIntervalId = setInterval(async () => { + if (!this.#currentRound) return + + const now = new Date() + if (new Date(this.#currentRound.end_time) <= now) { + try { + await this.#startNewRound() + } catch (error) { + console.error('Error handling round end:', error) + } + } + }, this.#config.checkRoundIntervalMs) + } + + /** + * Start a new round + */ + async #startNewRound () { + const previousRound = await this.#getActiveRound() + this.#currentRound = await this.#createNewRound() + if (!this.#currentRound) { + throw new Error('Failed to start a new round') + } + + if (previousRound) { + await this.changeRoundActive(previousRound.id, false) + } + + await this.changeRoundActive(this.#currentRound.id, true) + await this.#generateTasksForRound(this.#currentRound.id) + } + + /** + * Get the current active round from the database + */ + async #getActiveRound () { + try { + const { rows } = await this.#db.query(` + SELECT * FROM checker_rounds + WHERE active = true + ORDER BY start_time DESC + LIMIT 1 + `) + return rows[0] || null + } catch (error) { + console.error('Error getting active round:', error) + return null + } + } + + /** + * Create a new round + */ + async #createNewRound () { + try { + const now = new Date() + const endTime = new Date(now.getTime() + this.#config.roundDurationMs) + + const { rows } = await this.#db.query(` + INSERT INTO checker_rounds (start_time, end_time, max_tasks_per_node, active) + VALUES ($1, $2, $3, $4) + RETURNING * + `, [now, endTime, this.#config.maxTasksPerNode, true]) + + const round = rows[0] + console.log(`Created new round #${round.id} starting at ${round.start_time}`) + return round + } catch (error) { + console.error('Error creating new round:', error) + throw error + } + } + + /** + * Change the status of a round using a transaction + * @param {number} roundId + * @param {Boolean} active + */ + async changeRoundActive (roundId, active) { + const client = await this.#db.connect() + + try { + await client.query('BEGIN') + const { rows } = await client.query(` + UPDATE checker_rounds + SET active = $1 + WHERE id = $2 + RETURNING * + `, [active, roundId]) + await client.query('COMMIT') + + console.log(`Round #${rows[0].id} active: ${rows[0].active}`) + return rows[0] + } catch (error) { + await client.query('ROLLBACK') + console.error('Error changing round status:', error) + throw error + } finally { + client.release() + } + } + + /** + * Generate tasks for all registered subnets for a specific round + * @param {number} roundId + */ + async #generateTasksForRound (roundId) { + console.log(`Generating tasks for round #${roundId}`) + + const subnets = Object.keys(this.#taskSamplers) + if (subnets.length === 0) { + console.warn('No task samplers registered. No tasks will be generated.') + return + } + + await Promise.all(subnets.map(subnet => this.#generateTasksForSubnet(roundId, subnet))) + } + + /** + * Generate tasks for a specific subnet + * @param {number} roundId + * @param {string} subnet + */ + async #generateTasksForSubnet (roundId, subnet) { + try { + const sampler = this.#taskSamplers[subnet] + if (!sampler) return + + console.log(`Sampling tasks for subnet: ${subnet}`) + const tasks = await Promise.resolve(sampler()) + + if (Array.isArray(tasks) && tasks.length > 0) { + await this.#storeTasks(roundId, subnet, tasks) + console.log(`Generated ${tasks.length} tasks for subnet ${subnet} in round #${roundId}`) + } else { + console.warn(`No tasks generated for subnet ${subnet} in round #${roundId}`) + } + } catch (error) { + console.error(`Error generating tasks for subnet ${subnet}:`, error) + } + } + + /** + * Store tasks in the database + * @param {number} roundId + * @param {string} subnet + * @param {Array} tasks + */ + async #storeTasks (roundId, subnet, tasks) { + const client = await this.#db.connect() + + try { + await client.query('BEGIN') + await client.query(` + INSERT INTO checker_subnet_tasks (round_id, subnet, task_definition) + SELECT $1, $2, task_definition + FROM UNNEST($3::JSONB[]) AS t(task_definition) + `, [ + roundId, + subnet, + tasks.map(task => JSON.stringify(task)) + ]) + await client.query('COMMIT') + } catch (error) { + await client.query('ROLLBACK') + console.error(`Error storing tasks for subnet ${subnet}:`, error) + throw error + } finally { + client.release() + } + } +} diff --git a/migrations/003.do.checker-rounds.sql b/migrations/003.do.checker-rounds.sql new file mode 100644 index 0000000..5994e88 --- /dev/null +++ b/migrations/003.do.checker-rounds.sql @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXISTS checker_rounds ( + id BIGSERIAL PRIMARY KEY, + start_time TIMESTAMPTZ NOT NULL, + end_time TIMESTAMPTZ NOT NULL, + active BOOLEAN NOT NULL DEFAULT FALSE, + max_tasks_per_node INT NOT NULL DEFAULT 360 +); diff --git a/migrations/004.do.checker-subnet-tasks.sql b/migrations/004.do.checker-subnet-tasks.sql new file mode 100644 index 0000000..dddc093 --- /dev/null +++ b/migrations/004.do.checker-subnet-tasks.sql @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXISTS checker_subnet_tasks ( + id BIGSERIAL PRIMARY KEY, + subnet TEXT NOT NULL, + round_id BIGINT NOT NULL, + task_definition JSONB NOT NULL, + FOREIGN KEY (round_id) REFERENCES checker_rounds (id) ON DELETE CASCADE +); diff --git a/test/round-service.test.js b/test/round-service.test.js new file mode 100644 index 0000000..1ad7315 --- /dev/null +++ b/test/round-service.test.js @@ -0,0 +1,174 @@ +import assert from 'assert' +import { after, before, beforeEach, describe, it } from 'node:test' +import { createPgPool } from '../lib/pool.js' +import { migrateWithPgClient } from '../lib/migrate.js' +import { DATABASE_URL } from '../lib/config.js' +import { RoundService } from '../lib/round-service.js' + +const DEFAULT_CONFIG = { + roundDurationMs: 1000, + maxTasks: 100, + maxTasksPerNode: 10, + checkRoundIntervalMs: 200 +} + +describe('RoundService', () => { + /** @type {import('pg').Pool} */ + let pgPool + + before(async () => { + pgPool = await createPgPool(DATABASE_URL) + await migrateWithPgClient(pgPool) + }) + + after(async () => { + await pgPool.end() + }) + + beforeEach(async () => { + // Reset the database state before each test + await pgPool.query('DELETE FROM checker_rounds') + await pgPool.query('DELETE FROM checker_subnet_tasks') + }) + + describe('registerTaskSampler', () => { + it('should register a task sampler for a subnet', () => { + const roundService = new RoundService(pgPool, DEFAULT_CONFIG) + const samplerFn = () => { } + roundService.registerTaskSampler('subnet1', samplerFn) + + assert.doesNotThrow(() => roundService.registerTaskSampler('subnet1', samplerFn)) + }) + + it('should throw an error if samplerFn is not a function', () => { + const roundService = new RoundService(pgPool, DEFAULT_CONFIG) + assert.throws( + // @ts-ignore + () => roundService.registerTaskSampler('subnet1', null), + /Task sampler for subnet subnet1 must be a function/ + ) + }) + }) + + describe('rounds', () => { + it('should create a new round if no active round exists', async () => { + const roundService = new RoundService(pgPool, DEFAULT_CONFIG) + + await roundService.start() + roundService.stop() + + const { rows: rounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true') + assert.strictEqual(rounds.length, 1) + assert.ok(new Date(rounds[0].end_time) > new Date()) + }) + + it('should resume an active round if one exists', async () => { + const now = new Date() + const endTime = new Date(now.getTime() + DEFAULT_CONFIG.roundDurationMs) + await pgPool.query(` + INSERT INTO checker_rounds (start_time, end_time, max_tasks_per_node, active) + VALUES ($1, $2, $3, $4) + `, [now, endTime, DEFAULT_CONFIG.maxTasksPerNode, true]) + + const roundService = new RoundService(pgPool, DEFAULT_CONFIG) + + await roundService.start() + roundService.stop() + + const { rows: rounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true') + assert.strictEqual(rounds.length, 1) + assert.strictEqual(new Date(rounds[0].start_time).toISOString(), now.toISOString()) + }) + + it('should stop the round service and prevent further round checks', async () => { + const roundService = new RoundService(pgPool, DEFAULT_CONFIG) + + await roundService.start() + roundService.stop() + + const { rows: rounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true') + assert.strictEqual(rounds.length, 1) + + // Wait for the check interval to pass and ensure no new rounds are created + await new Promise(resolve => setTimeout(resolve, DEFAULT_CONFIG.checkRoundIntervalMs + 1000)) + + const { rows: newRounds } = await pgPool.query('SELECT * FROM checker_rounds') + assert.strictEqual(newRounds.length, 1) + }) + }) + + describe('task generation', () => { + it('should generate tasks for all registered subnets during a round', async () => { + const roundService = new RoundService(pgPool, DEFAULT_CONFIG) + + roundService.registerTaskSampler('subnet1', async () => [ + { payloadId: 'task1', nodeId: 'node1' }, + { payloadId: 'task2', nodeId: 'node2' } + ]) + + roundService.registerTaskSampler('subnet2', async () => [ + { payloadId: 'task3', nodeId: 'node3' } + ]) + + // roundService.registerTaskSampler('subnet2', async () => { + // throw new Error('Error generating tasks') + // }) + + await roundService.start() + roundService.stop() + + const { rows: rounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true') + const activeRoundId = rounds[0].id + + const { rows: tasks } = await pgPool.query('SELECT * FROM checker_subnet_tasks WHERE round_id = $1', [activeRoundId]) + assert.strictEqual(tasks.length, 3) + + const taskPayloads = tasks.map(task => task.task_definition) + assert.deepStrictEqual(taskPayloads.sort(), [ + { payloadId: 'task1', nodeId: 'node1' }, + { payloadId: 'task2', nodeId: 'node2' }, + { payloadId: 'task3', nodeId: 'node3' } + ]) + }) + + it('should not generate tasks if no samplers are registered', async () => { + const roundService = new RoundService(pgPool, DEFAULT_CONFIG) + + await roundService.start() + roundService.stop() + + const { rows: rounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true') + const activeRoundId = rounds[0].id + + const { rows: tasks } = await pgPool.query('SELECT * FROM checker_subnet_tasks WHERE round_id = $1', [activeRoundId]) + assert.strictEqual(tasks.length, 0) + }) + }) + + describe('round transitions', () => { + it('should deactivate the old round and create a new one when the current round ends', async () => { + const now = new Date() + const endTime = new Date(now.getTime() + 1000) // 1 second duration + await pgPool.query(` + INSERT INTO checker_rounds (start_time, end_time, max_tasks_per_node, active) + VALUES ($1, $2, $3, $4) + `, [now, endTime, DEFAULT_CONFIG.maxTasksPerNode, true]) + + const roundService = new RoundService(pgPool, DEFAULT_CONFIG) + + await roundService.start() + + // Wait for the current round to end + await new Promise(resolve => setTimeout(resolve, 2000)) + + roundService.stop() + + const { rows: activeRounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true') + assert.strictEqual(activeRounds.length, 1) + assert.ok(new Date(activeRounds[0].start_time) > endTime) + + const { rows: allRounds } = await pgPool.query('SELECT * FROM checker_rounds') + assert.strictEqual(allRounds.length, 2) + }) + }) +}) From 11eee5ad34d07d2c7d08149657a3b05f37a33a2a Mon Sep 17 00:00:00 2001 From: Srdjan S Date: Mon, 14 Apr 2025 13:37:40 +0200 Subject: [PATCH 02/24] Separate round and tasking service --- bin/simple-subnet-api.js | 58 ++++++++++-------- lib/round-service.js | 110 +++++------------------------------ lib/tasking-service.js | 105 +++++++++++++++++++++++++++++++++ test/round-service.test.js | 82 ++++---------------------- test/tasking-service.test.js | 107 ++++++++++++++++++++++++++++++++++ 5 files changed, 270 insertions(+), 192 deletions(-) create mode 100644 lib/tasking-service.js create mode 100644 test/tasking-service.test.js diff --git a/bin/simple-subnet-api.js b/bin/simple-subnet-api.js index 563a676..3962a7d 100644 --- a/bin/simple-subnet-api.js +++ b/bin/simple-subnet-api.js @@ -3,39 +3,49 @@ import { createApp } from '../lib/app.js' import { DATABASE_URL, HOST, PORT, REQUEST_LOGGING, poolConfig } from '../lib/config.js' import { RoundService } from '../lib/round-service.js' import { createPgPool } from '../lib/pool.js' - -const app = createApp({ - databaseUrl: DATABASE_URL, - dbPoolConfig: poolConfig, - logger: { - level: ['1', 'true'].includes(REQUEST_LOGGING) ? 'info' : 'error' - } -}) -console.log('Starting the http server on host %j port %s', HOST, PORT) -const serverUrl = await app.listen({ host: HOST, port: Number(PORT) }) -console.log(serverUrl) +import { TaskingService } from '../lib/tasking-service.js' const pool = await createPgPool(DATABASE_URL) +const taskingService = new TaskingService( + pool, + { + maxTasks: 100 + } +) const roundService = new RoundService( pool, + taskingService, { - roundDurationMs: 20 * 60 * 1000, // 20 minutes - maxTasks: 100, + roundDurationMs: 1 * 60 * 1000, // 20 minutes maxTasksPerNode: 10, - checkRoundIntervalMs: 1 * 60 * 1000 // 1 minute + checkRoundIntervalMs: 1000 // 1 minute } -); +) -roundService.start(); +roundService.start().catch((error) => { + console.error('Failed to start round service:', error) + process.exit(1) +}) process.on('SIGINT', async () => { - console.log('Stopping round service...'); - roundService.stop(); - process.exit(0); -}); + console.log('Stopping round service...') + roundService.stop() + process.exit(0) +}) process.on('SIGTERM', async () => { - console.log('Stopping round service...'); - roundService.stop(); - process.exit(0); -}); + console.log('Stopping round service...') + roundService.stop() + process.exit(0) +}) + +const app = createApp({ + databaseUrl: DATABASE_URL, + dbPoolConfig: poolConfig, + logger: { + level: ['1', 'true'].includes(REQUEST_LOGGING) ? 'info' : 'error' + } +}) +console.log('Starting the http server on host %j port %s', HOST, PORT) +const serverUrl = await app.listen({ host: HOST, port: Number(PORT) }) +console.log(serverUrl) diff --git a/lib/round-service.js b/lib/round-service.js index f1a52fc..7cbfdd1 100644 --- a/lib/round-service.js +++ b/lib/round-service.js @@ -1,13 +1,7 @@ -/** @typedef {any} Task */ /** @typedef {{id: number; start_time: string; end_time: string; max_tasks_per_node: number; }} Round */ -/** @typedef {{ roundDurationMs: number; maxTasks: number; maxTasksPerNode: number; checkRoundIntervalMs: number }} Config */ +/** @typedef {{ roundDurationMs: number; maxTasksPerNode: number; checkRoundIntervalMs: number }} RoundConfig */ export class RoundService { - /** - * @type {Object} - * @description A mapping of subnet identifiers to their task sampler functions. - */ - #taskSamplers = {} /** * @type {Round | null} */ @@ -20,26 +14,17 @@ export class RoundService { #db #config + #taskingService + /** * @param {import('./typings.js').PgPool} db - * @param {Config} config + * @param {import('./tasking-service.js').TaskingService} taskingService + * @param {RoundConfig} config */ - constructor (db, config) { + constructor (db, taskingService, config) { this.#db = db this.#config = config - } - - /** - * Register a task sampler for a specific subnet - * @param {string} subnet - The subnet identifier - * @param {Function} samplerFn - Function that generates tasks for a subnet - */ - registerTaskSampler (subnet, samplerFn) { - if (typeof samplerFn !== 'function') { - throw new Error(`Task sampler for subnet ${subnet} must be a function`) - } - this.#taskSamplers[subnet] = samplerFn - console.log(`Task sampler registered for subnet: ${subnet}`) + this.#taskingService = taskingService } /** @@ -110,12 +95,15 @@ export class RoundService { throw new Error('Failed to start a new round') } + if (this.#taskingService) { + await this.#taskingService.generateTasksForRound(this.#currentRound.id) + } + if (previousRound) { - await this.changeRoundActive(previousRound.id, false) + await this.#changeRoundActive(previousRound.id, false) } - await this.changeRoundActive(this.#currentRound.id, true) - await this.#generateTasksForRound(this.#currentRound.id) + await this.#changeRoundActive(this.#currentRound.id, true) } /** @@ -164,7 +152,7 @@ export class RoundService { * @param {number} roundId * @param {Boolean} active */ - async changeRoundActive (roundId, active) { + async #changeRoundActive (roundId, active) { const client = await this.#db.connect() try { @@ -187,74 +175,4 @@ export class RoundService { client.release() } } - - /** - * Generate tasks for all registered subnets for a specific round - * @param {number} roundId - */ - async #generateTasksForRound (roundId) { - console.log(`Generating tasks for round #${roundId}`) - - const subnets = Object.keys(this.#taskSamplers) - if (subnets.length === 0) { - console.warn('No task samplers registered. No tasks will be generated.') - return - } - - await Promise.all(subnets.map(subnet => this.#generateTasksForSubnet(roundId, subnet))) - } - - /** - * Generate tasks for a specific subnet - * @param {number} roundId - * @param {string} subnet - */ - async #generateTasksForSubnet (roundId, subnet) { - try { - const sampler = this.#taskSamplers[subnet] - if (!sampler) return - - console.log(`Sampling tasks for subnet: ${subnet}`) - const tasks = await Promise.resolve(sampler()) - - if (Array.isArray(tasks) && tasks.length > 0) { - await this.#storeTasks(roundId, subnet, tasks) - console.log(`Generated ${tasks.length} tasks for subnet ${subnet} in round #${roundId}`) - } else { - console.warn(`No tasks generated for subnet ${subnet} in round #${roundId}`) - } - } catch (error) { - console.error(`Error generating tasks for subnet ${subnet}:`, error) - } - } - - /** - * Store tasks in the database - * @param {number} roundId - * @param {string} subnet - * @param {Array} tasks - */ - async #storeTasks (roundId, subnet, tasks) { - const client = await this.#db.connect() - - try { - await client.query('BEGIN') - await client.query(` - INSERT INTO checker_subnet_tasks (round_id, subnet, task_definition) - SELECT $1, $2, task_definition - FROM UNNEST($3::JSONB[]) AS t(task_definition) - `, [ - roundId, - subnet, - tasks.map(task => JSON.stringify(task)) - ]) - await client.query('COMMIT') - } catch (error) { - await client.query('ROLLBACK') - console.error(`Error storing tasks for subnet ${subnet}:`, error) - throw error - } finally { - client.release() - } - } } diff --git a/lib/tasking-service.js b/lib/tasking-service.js new file mode 100644 index 0000000..d3224b0 --- /dev/null +++ b/lib/tasking-service.js @@ -0,0 +1,105 @@ +/** @typedef {any} Task */ +/** @typedef {() => Promise} TaskSamplingFn */ +/** @typedef {{ maxTasks: number }} TaskingConfig */ + +export class TaskingService { + /** + * @type {Object} + * @description A mapping of subnet identifiers to their task sampler functions. + */ + #taskSamplers = {} + #db + #config + + /** + * @param {import('./typings.js').PgPool} db + * @param {TaskingConfig} config + */ + constructor (db, config) { + this.#db = db + this.#config = config + } + + /** + * Register a task sampler for a specific subnet + * @param {string} subnet - The subnet identifier + * @param {TaskSamplingFn} sampleFn - Function that generates tasks for a subnet + */ + registerTaskSampler (subnet, sampleFn) { + if (typeof sampleFn !== 'function') { + throw new Error(`Task sampler for subnet ${subnet} must be a function`) + } + this.#taskSamplers[subnet] = sampleFn + console.log(`Task sampler registered for subnet: ${subnet}`) + } + + /** + * Generate tasks for all registered subnets for a specific round + * @param {number} roundId + */ + async generateTasksForRound (roundId) { + console.log(`Generating tasks for round #${roundId}`) + + const subnets = Object.keys(this.#taskSamplers) + if (subnets.length === 0) { + console.warn('No task samplers registered. No tasks will be generated.') + return + } + + await Promise.all(subnets.map(subnet => this.#generateTasksForSubnet(roundId, subnet))) + } + + /** + * Generate tasks for a specific subnet + * @param {number} roundId + * @param {string} subnet + */ + async #generateTasksForSubnet (roundId, subnet) { + try { + const taskSamplingFn = this.#taskSamplers[subnet] + if (!taskSamplingFn) return + + console.log(`Sampling tasks for subnet: ${subnet}`) + const tasks = await Promise.resolve(taskSamplingFn()) + + if (Array.isArray(tasks) && tasks.length > 0) { + await this.#storeTasks(roundId, subnet, tasks) + console.log(`Generated ${tasks.length} tasks for subnet ${subnet} in round #${roundId}`) + } else { + console.warn(`No tasks generated for subnet ${subnet} in round #${roundId}`) + } + } catch (error) { + console.error(`Error generating tasks for subnet ${subnet}:`, error) + } + } + + /** + * Store tasks in the database + * @param {number} roundId + * @param {string} subnet + * @param {Array} tasks + */ + async #storeTasks (roundId, subnet, tasks) { + const client = await this.#db.connect() + + try { + await client.query('BEGIN') + await client.query(` + INSERT INTO checker_subnet_tasks (round_id, subnet, task_definition) + SELECT $1, $2, task_definition + FROM UNNEST($3::JSONB[]) AS t(task_definition) + `, [ + roundId, + subnet, + tasks.map(task => JSON.stringify(task)) + ]) + await client.query('COMMIT') + } catch (error) { + await client.query('ROLLBACK') + console.error(`Error storing tasks for subnet ${subnet}:`, error) + throw error + } finally { + client.release() + } + } +} diff --git a/test/round-service.test.js b/test/round-service.test.js index 1ad7315..05bcf2a 100644 --- a/test/round-service.test.js +++ b/test/round-service.test.js @@ -4,6 +4,7 @@ import { createPgPool } from '../lib/pool.js' import { migrateWithPgClient } from '../lib/migrate.js' import { DATABASE_URL } from '../lib/config.js' import { RoundService } from '../lib/round-service.js' +import { TaskingService } from '../lib/tasking-service.js' const DEFAULT_CONFIG = { roundDurationMs: 1000, @@ -15,10 +16,15 @@ const DEFAULT_CONFIG = { describe('RoundService', () => { /** @type {import('pg').Pool} */ let pgPool + /** @type {TaskingService} */ + let taskingService before(async () => { pgPool = await createPgPool(DATABASE_URL) await migrateWithPgClient(pgPool) + taskingService = new TaskingService(pgPool, { + maxTasks: DEFAULT_CONFIG.maxTasks + }) }) after(async () => { @@ -28,31 +34,11 @@ describe('RoundService', () => { beforeEach(async () => { // Reset the database state before each test await pgPool.query('DELETE FROM checker_rounds') - await pgPool.query('DELETE FROM checker_subnet_tasks') - }) - - describe('registerTaskSampler', () => { - it('should register a task sampler for a subnet', () => { - const roundService = new RoundService(pgPool, DEFAULT_CONFIG) - const samplerFn = () => { } - roundService.registerTaskSampler('subnet1', samplerFn) - - assert.doesNotThrow(() => roundService.registerTaskSampler('subnet1', samplerFn)) - }) - - it('should throw an error if samplerFn is not a function', () => { - const roundService = new RoundService(pgPool, DEFAULT_CONFIG) - assert.throws( - // @ts-ignore - () => roundService.registerTaskSampler('subnet1', null), - /Task sampler for subnet subnet1 must be a function/ - ) - }) }) describe('rounds', () => { it('should create a new round if no active round exists', async () => { - const roundService = new RoundService(pgPool, DEFAULT_CONFIG) + const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG) await roundService.start() roundService.stop() @@ -70,7 +56,7 @@ describe('RoundService', () => { VALUES ($1, $2, $3, $4) `, [now, endTime, DEFAULT_CONFIG.maxTasksPerNode, true]) - const roundService = new RoundService(pgPool, DEFAULT_CONFIG) + const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG) await roundService.start() roundService.stop() @@ -81,7 +67,7 @@ describe('RoundService', () => { }) it('should stop the round service and prevent further round checks', async () => { - const roundService = new RoundService(pgPool, DEFAULT_CONFIG) + const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG) await roundService.start() roundService.stop() @@ -97,54 +83,6 @@ describe('RoundService', () => { }) }) - describe('task generation', () => { - it('should generate tasks for all registered subnets during a round', async () => { - const roundService = new RoundService(pgPool, DEFAULT_CONFIG) - - roundService.registerTaskSampler('subnet1', async () => [ - { payloadId: 'task1', nodeId: 'node1' }, - { payloadId: 'task2', nodeId: 'node2' } - ]) - - roundService.registerTaskSampler('subnet2', async () => [ - { payloadId: 'task3', nodeId: 'node3' } - ]) - - // roundService.registerTaskSampler('subnet2', async () => { - // throw new Error('Error generating tasks') - // }) - - await roundService.start() - roundService.stop() - - const { rows: rounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true') - const activeRoundId = rounds[0].id - - const { rows: tasks } = await pgPool.query('SELECT * FROM checker_subnet_tasks WHERE round_id = $1', [activeRoundId]) - assert.strictEqual(tasks.length, 3) - - const taskPayloads = tasks.map(task => task.task_definition) - assert.deepStrictEqual(taskPayloads.sort(), [ - { payloadId: 'task1', nodeId: 'node1' }, - { payloadId: 'task2', nodeId: 'node2' }, - { payloadId: 'task3', nodeId: 'node3' } - ]) - }) - - it('should not generate tasks if no samplers are registered', async () => { - const roundService = new RoundService(pgPool, DEFAULT_CONFIG) - - await roundService.start() - roundService.stop() - - const { rows: rounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true') - const activeRoundId = rounds[0].id - - const { rows: tasks } = await pgPool.query('SELECT * FROM checker_subnet_tasks WHERE round_id = $1', [activeRoundId]) - assert.strictEqual(tasks.length, 0) - }) - }) - describe('round transitions', () => { it('should deactivate the old round and create a new one when the current round ends', async () => { const now = new Date() @@ -154,7 +92,7 @@ describe('RoundService', () => { VALUES ($1, $2, $3, $4) `, [now, endTime, DEFAULT_CONFIG.maxTasksPerNode, true]) - const roundService = new RoundService(pgPool, DEFAULT_CONFIG) + const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG) await roundService.start() diff --git a/test/tasking-service.test.js b/test/tasking-service.test.js new file mode 100644 index 0000000..4f96843 --- /dev/null +++ b/test/tasking-service.test.js @@ -0,0 +1,107 @@ +import assert from 'assert' +import { after, before, beforeEach, describe, it } from 'node:test' +import { createPgPool } from '../lib/pool.js' +import { migrateWithPgClient } from '../lib/migrate.js' +import { DATABASE_URL } from '../lib/config.js' +import { TaskingService } from '../lib/tasking-service.js' + +const DEFAULT_CONFIG = { + maxTasks: 100 +} + +describe('TaskingService', () => { + /** @type {import('pg').Pool} */ + let pgPool + + before(async () => { + pgPool = await createPgPool(DATABASE_URL) + await migrateWithPgClient(pgPool) + }) + + after(async () => { + await pgPool.end() + }) + + beforeEach(async () => { + // Reset the database state before each test + await pgPool.query('DELETE FROM checker_subnet_tasks') + }) + + describe('registerTaskSampler', () => { + it('should register a task sampler for a subnet', () => { + const taskingService = new TaskingService(pgPool, DEFAULT_CONFIG) + const samplerFn = async () => [] + taskingService.registerTaskSampler('subnet1', samplerFn) + + assert.doesNotThrow(() => taskingService.registerTaskSampler('subnet1', samplerFn)) + }) + + it('should throw an error if samplerFn is not a function', () => { + const taskingService = new TaskingService(pgPool, DEFAULT_CONFIG) + assert.throws( + // @ts-ignore + () => taskingService.registerTaskSampler('subnet1', null), + /Task sampler for subnet subnet1 must be a function/ + ) + }) + }) + + describe('task generation', () => { + it('should generate tasks for all registered subnets that dont throw errors', async () => { + const taskingService = new TaskingService(pgPool, DEFAULT_CONFIG) + + taskingService.registerTaskSampler('subnet1', async () => [ + { payloadId: 'task1', nodeId: 'node1' }, + { payloadId: 'task2', nodeId: 'node2' } + ]) + + taskingService.registerTaskSampler('subnet2', async () => [ + { payloadId: 'task3', nodeId: 'node3' } + ]) + + taskingService.registerTaskSampler('subnet3', async () => { + throw new Error('Error sampling tasks') + }) + + const mockRound = await givenRound(pgPool) + await taskingService.generateTasksForRound(mockRound.id) + + console.log('mockRound.id', mockRound.id) + + const { rows: tasks } = await pgPool.query('SELECT * FROM checker_subnet_tasks WHERE round_id = $1', [mockRound.id]) + + const taskPayloads = tasks.map(task => task.task_definition) + assert.deepStrictEqual(taskPayloads.sort(), [ + { payloadId: 'task1', nodeId: 'node1' }, + { payloadId: 'task2', nodeId: 'node2' }, + { payloadId: 'task3', nodeId: 'node3' } + ]) + }) + + it('should not generate tasks if no samplers are registered', async () => { + const taskingService = new TaskingService(pgPool, DEFAULT_CONFIG) + + const round = await givenRound(pgPool) + taskingService.generateTasksForRound(round.id) + + const { rows: tasks } = await pgPool.query('SELECT * FROM checker_subnet_tasks WHERE round_id = $1', [round.id]) + assert.strictEqual(tasks.length, 0) + }) + }) +}) + +/** + * + * @param {import('../lib/typings.js').PgPool} pgPool + */ +const givenRound = async (pgPool, maxTasksPerNode = 100) => { + const now = new Date() + const endTime = new Date(now.getTime() + 1000) // 1 second duration + const { rows } = await pgPool.query(` + INSERT INTO checker_rounds (start_time, end_time, max_tasks_per_node, active) + VALUES ($1, $2, $3, $4) + RETURNING * + `, [now, endTime, maxTasksPerNode, false]) + + return rows[0] +} From 7e0a4e2f2529a929f964a969da7aa2a1d99e4b93 Mon Sep 17 00:00:00 2001 From: Srdjan S Date: Mon, 14 Apr 2025 13:54:32 +0200 Subject: [PATCH 03/24] Remove tasking implementation --- bin/simple-subnet-api.js | 37 ------------ lib/tasking-service.js | 76 +------------------------ test/tasking-service.test.js | 107 ----------------------------------- 3 files changed, 3 insertions(+), 217 deletions(-) delete mode 100644 test/tasking-service.test.js diff --git a/bin/simple-subnet-api.js b/bin/simple-subnet-api.js index 3962a7d..e638191 100644 --- a/bin/simple-subnet-api.js +++ b/bin/simple-subnet-api.js @@ -1,43 +1,6 @@ import '../lib/instrument.js' import { createApp } from '../lib/app.js' import { DATABASE_URL, HOST, PORT, REQUEST_LOGGING, poolConfig } from '../lib/config.js' -import { RoundService } from '../lib/round-service.js' -import { createPgPool } from '../lib/pool.js' -import { TaskingService } from '../lib/tasking-service.js' - -const pool = await createPgPool(DATABASE_URL) -const taskingService = new TaskingService( - pool, - { - maxTasks: 100 - } -) -const roundService = new RoundService( - pool, - taskingService, - { - roundDurationMs: 1 * 60 * 1000, // 20 minutes - maxTasksPerNode: 10, - checkRoundIntervalMs: 1000 // 1 minute - } -) - -roundService.start().catch((error) => { - console.error('Failed to start round service:', error) - process.exit(1) -}) - -process.on('SIGINT', async () => { - console.log('Stopping round service...') - roundService.stop() - process.exit(0) -}) - -process.on('SIGTERM', async () => { - console.log('Stopping round service...') - roundService.stop() - process.exit(0) -}) const app = createApp({ databaseUrl: DATABASE_URL, diff --git a/lib/tasking-service.js b/lib/tasking-service.js index d3224b0..63683ad 100644 --- a/lib/tasking-service.js +++ b/lib/tasking-service.js @@ -3,11 +3,6 @@ /** @typedef {{ maxTasks: number }} TaskingConfig */ export class TaskingService { - /** - * @type {Object} - * @description A mapping of subnet identifiers to their task sampler functions. - */ - #taskSamplers = {} #db #config @@ -26,11 +21,7 @@ export class TaskingService { * @param {TaskSamplingFn} sampleFn - Function that generates tasks for a subnet */ registerTaskSampler (subnet, sampleFn) { - if (typeof sampleFn !== 'function') { - throw new Error(`Task sampler for subnet ${subnet} must be a function`) - } - this.#taskSamplers[subnet] = sampleFn - console.log(`Task sampler registered for subnet: ${subnet}`) + console.warn('Registering task sampler is not implmented.') } /** @@ -38,68 +29,7 @@ export class TaskingService { * @param {number} roundId */ async generateTasksForRound (roundId) { - console.log(`Generating tasks for round #${roundId}`) - - const subnets = Object.keys(this.#taskSamplers) - if (subnets.length === 0) { - console.warn('No task samplers registered. No tasks will be generated.') - return - } - - await Promise.all(subnets.map(subnet => this.#generateTasksForSubnet(roundId, subnet))) - } - - /** - * Generate tasks for a specific subnet - * @param {number} roundId - * @param {string} subnet - */ - async #generateTasksForSubnet (roundId, subnet) { - try { - const taskSamplingFn = this.#taskSamplers[subnet] - if (!taskSamplingFn) return - - console.log(`Sampling tasks for subnet: ${subnet}`) - const tasks = await Promise.resolve(taskSamplingFn()) - - if (Array.isArray(tasks) && tasks.length > 0) { - await this.#storeTasks(roundId, subnet, tasks) - console.log(`Generated ${tasks.length} tasks for subnet ${subnet} in round #${roundId}`) - } else { - console.warn(`No tasks generated for subnet ${subnet} in round #${roundId}`) - } - } catch (error) { - console.error(`Error generating tasks for subnet ${subnet}:`, error) - } - } - - /** - * Store tasks in the database - * @param {number} roundId - * @param {string} subnet - * @param {Array} tasks - */ - async #storeTasks (roundId, subnet, tasks) { - const client = await this.#db.connect() - - try { - await client.query('BEGIN') - await client.query(` - INSERT INTO checker_subnet_tasks (round_id, subnet, task_definition) - SELECT $1, $2, task_definition - FROM UNNEST($3::JSONB[]) AS t(task_definition) - `, [ - roundId, - subnet, - tasks.map(task => JSON.stringify(task)) - ]) - await client.query('COMMIT') - } catch (error) { - await client.query('ROLLBACK') - console.error(`Error storing tasks for subnet ${subnet}:`, error) - throw error - } finally { - client.release() - } + // TODO: Implement the logic to generate tasks for all registered subnets + console.warn('Tasking service is not implemented.') } } diff --git a/test/tasking-service.test.js b/test/tasking-service.test.js deleted file mode 100644 index 4f96843..0000000 --- a/test/tasking-service.test.js +++ /dev/null @@ -1,107 +0,0 @@ -import assert from 'assert' -import { after, before, beforeEach, describe, it } from 'node:test' -import { createPgPool } from '../lib/pool.js' -import { migrateWithPgClient } from '../lib/migrate.js' -import { DATABASE_URL } from '../lib/config.js' -import { TaskingService } from '../lib/tasking-service.js' - -const DEFAULT_CONFIG = { - maxTasks: 100 -} - -describe('TaskingService', () => { - /** @type {import('pg').Pool} */ - let pgPool - - before(async () => { - pgPool = await createPgPool(DATABASE_URL) - await migrateWithPgClient(pgPool) - }) - - after(async () => { - await pgPool.end() - }) - - beforeEach(async () => { - // Reset the database state before each test - await pgPool.query('DELETE FROM checker_subnet_tasks') - }) - - describe('registerTaskSampler', () => { - it('should register a task sampler for a subnet', () => { - const taskingService = new TaskingService(pgPool, DEFAULT_CONFIG) - const samplerFn = async () => [] - taskingService.registerTaskSampler('subnet1', samplerFn) - - assert.doesNotThrow(() => taskingService.registerTaskSampler('subnet1', samplerFn)) - }) - - it('should throw an error if samplerFn is not a function', () => { - const taskingService = new TaskingService(pgPool, DEFAULT_CONFIG) - assert.throws( - // @ts-ignore - () => taskingService.registerTaskSampler('subnet1', null), - /Task sampler for subnet subnet1 must be a function/ - ) - }) - }) - - describe('task generation', () => { - it('should generate tasks for all registered subnets that dont throw errors', async () => { - const taskingService = new TaskingService(pgPool, DEFAULT_CONFIG) - - taskingService.registerTaskSampler('subnet1', async () => [ - { payloadId: 'task1', nodeId: 'node1' }, - { payloadId: 'task2', nodeId: 'node2' } - ]) - - taskingService.registerTaskSampler('subnet2', async () => [ - { payloadId: 'task3', nodeId: 'node3' } - ]) - - taskingService.registerTaskSampler('subnet3', async () => { - throw new Error('Error sampling tasks') - }) - - const mockRound = await givenRound(pgPool) - await taskingService.generateTasksForRound(mockRound.id) - - console.log('mockRound.id', mockRound.id) - - const { rows: tasks } = await pgPool.query('SELECT * FROM checker_subnet_tasks WHERE round_id = $1', [mockRound.id]) - - const taskPayloads = tasks.map(task => task.task_definition) - assert.deepStrictEqual(taskPayloads.sort(), [ - { payloadId: 'task1', nodeId: 'node1' }, - { payloadId: 'task2', nodeId: 'node2' }, - { payloadId: 'task3', nodeId: 'node3' } - ]) - }) - - it('should not generate tasks if no samplers are registered', async () => { - const taskingService = new TaskingService(pgPool, DEFAULT_CONFIG) - - const round = await givenRound(pgPool) - taskingService.generateTasksForRound(round.id) - - const { rows: tasks } = await pgPool.query('SELECT * FROM checker_subnet_tasks WHERE round_id = $1', [round.id]) - assert.strictEqual(tasks.length, 0) - }) - }) -}) - -/** - * - * @param {import('../lib/typings.js').PgPool} pgPool - */ -const givenRound = async (pgPool, maxTasksPerNode = 100) => { - const now = new Date() - const endTime = new Date(now.getTime() + 1000) // 1 second duration - const { rows } = await pgPool.query(` - INSERT INTO checker_rounds (start_time, end_time, max_tasks_per_node, active) - VALUES ($1, $2, $3, $4) - RETURNING * - `, [now, endTime, maxTasksPerNode, false]) - - return rows[0] -} From 91fa889b416715d7379c476655e53ba966f031e8 Mon Sep 17 00:00:00 2001 From: Srdjan S Date: Mon, 14 Apr 2025 13:55:47 +0200 Subject: [PATCH 04/24] Remove checker tasks table --- migrations/004.do.checker-subnet-tasks.sql | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 migrations/004.do.checker-subnet-tasks.sql diff --git a/migrations/004.do.checker-subnet-tasks.sql b/migrations/004.do.checker-subnet-tasks.sql deleted file mode 100644 index dddc093..0000000 --- a/migrations/004.do.checker-subnet-tasks.sql +++ /dev/null @@ -1,7 +0,0 @@ -CREATE TABLE IF NOT EXISTS checker_subnet_tasks ( - id BIGSERIAL PRIMARY KEY, - subnet TEXT NOT NULL, - round_id BIGINT NOT NULL, - task_definition JSONB NOT NULL, - FOREIGN KEY (round_id) REFERENCES checker_rounds (id) ON DELETE CASCADE -); From bf300bf495816ea659acb806a7ba7f0751c550f3 Mon Sep 17 00:00:00 2001 From: Srdjan S Date: Mon, 14 Apr 2025 13:56:50 +0200 Subject: [PATCH 05/24] Register round service --- bin/simple-subnet-api.js | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/bin/simple-subnet-api.js b/bin/simple-subnet-api.js index e638191..7ef2e67 100644 --- a/bin/simple-subnet-api.js +++ b/bin/simple-subnet-api.js @@ -1,6 +1,43 @@ import '../lib/instrument.js' import { createApp } from '../lib/app.js' import { DATABASE_URL, HOST, PORT, REQUEST_LOGGING, poolConfig } from '../lib/config.js' +import { TaskingService } from '../lib/tasking-service.js' +import { RoundService } from '../lib/round-service.js' +import { createPgPool } from '../lib/pool.js' + +const pool = await createPgPool(DATABASE_URL) +const taskingService = new TaskingService( + pool, + { + maxTasks: 100 + } +) +const roundService = new RoundService( + pool, + taskingService, + { + roundDurationMs: 1 * 60 * 1000, // 20 minutes + maxTasksPerNode: 10, + checkRoundIntervalMs: 1000 // 1 minute + } +) + +roundService.start().catch((error) => { + console.error('Failed to start round service:', error) + process.exit(1) +}) + +process.on('SIGINT', async () => { + console.log('Stopping round service...') + roundService.stop() + process.exit(0) +}) + +process.on('SIGTERM', async () => { + console.log('Stopping round service...') + roundService.stop() + process.exit(0) +}) const app = createApp({ databaseUrl: DATABASE_URL, From 538a697b3540fea0938a39e631fd61d269df1a5a Mon Sep 17 00:00:00 2001 From: Srdjan S Date: Mon, 14 Apr 2025 14:03:05 +0200 Subject: [PATCH 06/24] Add round and tasking configs --- bin/simple-subnet-api.js | 12 +++--------- lib/config.js | 12 +++++++++++- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/bin/simple-subnet-api.js b/bin/simple-subnet-api.js index 7ef2e67..6cf9db0 100644 --- a/bin/simple-subnet-api.js +++ b/bin/simple-subnet-api.js @@ -1,6 +1,6 @@ import '../lib/instrument.js' import { createApp } from '../lib/app.js' -import { DATABASE_URL, HOST, PORT, REQUEST_LOGGING, poolConfig } from '../lib/config.js' +import { DATABASE_URL, HOST, PORT, REQUEST_LOGGING, poolConfig, roundServiceConfig, taskingServiceConfig } from '../lib/config.js' import { TaskingService } from '../lib/tasking-service.js' import { RoundService } from '../lib/round-service.js' import { createPgPool } from '../lib/pool.js' @@ -8,18 +8,12 @@ import { createPgPool } from '../lib/pool.js' const pool = await createPgPool(DATABASE_URL) const taskingService = new TaskingService( pool, - { - maxTasks: 100 - } + taskingServiceConfig ) const roundService = new RoundService( pool, taskingService, - { - roundDurationMs: 1 * 60 * 1000, // 20 minutes - maxTasksPerNode: 10, - checkRoundIntervalMs: 1000 // 1 minute - } + roundServiceConfig ) roundService.start().catch((error) => { diff --git a/lib/config.js b/lib/config.js index fc42382..1b2c582 100644 --- a/lib/config.js +++ b/lib/config.js @@ -19,4 +19,14 @@ const poolConfig = { maxLifetimeSeconds: 60 } -export { DATABASE_URL, PORT, HOST, REQUEST_LOGGING, poolConfig } +const roundServiceConfig = { + roundDurationMs: 1 * 60 * 1000, // 20 minutes + maxTasksPerNode: 10, + checkRoundIntervalMs: 1000 // 1 minute +} + +const taskingServiceConfig = { + maxTasks: 100 +} + +export { DATABASE_URL, PORT, HOST, REQUEST_LOGGING, poolConfig, roundServiceConfig, taskingServiceConfig } From 0617456e7b6e9e423474cff5035cd57d63e684fe Mon Sep 17 00:00:00 2001 From: Srdjan Date: Mon, 14 Apr 2025 15:42:55 +0200 Subject: [PATCH 07/24] Fix config.js Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- lib/config.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/config.js b/lib/config.js index 1b2c582..e6b5366 100644 --- a/lib/config.js +++ b/lib/config.js @@ -20,7 +20,7 @@ const poolConfig = { } const roundServiceConfig = { - roundDurationMs: 1 * 60 * 1000, // 20 minutes + roundDurationMs: 20 * 60 * 1000, // 20 minutes maxTasksPerNode: 10, checkRoundIntervalMs: 1000 // 1 minute } From fb6591acd35fc9f282c5530fb90757b3df5033c9 Mon Sep 17 00:00:00 2001 From: Srdjan Date: Mon, 14 Apr 2025 15:43:01 +0200 Subject: [PATCH 08/24] Update lib/tasking-service.js Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- lib/tasking-service.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/tasking-service.js b/lib/tasking-service.js index 63683ad..8bec516 100644 --- a/lib/tasking-service.js +++ b/lib/tasking-service.js @@ -21,7 +21,7 @@ export class TaskingService { * @param {TaskSamplingFn} sampleFn - Function that generates tasks for a subnet */ registerTaskSampler (subnet, sampleFn) { - console.warn('Registering task sampler is not implmented.') + console.warn('Registering task sampler is not implemented.') } /** From 85b80e08a625f3000ff30173745a98a9b1c199c4 Mon Sep 17 00:00:00 2001 From: Srdjan S Date: Mon, 14 Apr 2025 15:43:30 +0200 Subject: [PATCH 09/24] Fix config --- lib/config.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/config.js b/lib/config.js index e6b5366..82781ea 100644 --- a/lib/config.js +++ b/lib/config.js @@ -22,7 +22,7 @@ const poolConfig = { const roundServiceConfig = { roundDurationMs: 20 * 60 * 1000, // 20 minutes maxTasksPerNode: 10, - checkRoundIntervalMs: 1000 // 1 minute + checkRoundIntervalMs: 60_000 // 1 minute } const taskingServiceConfig = { From c581ec7fcf7c5330d9065e78c49be4b4a2a391cf Mon Sep 17 00:00:00 2001 From: Srdjan S Date: Mon, 14 Apr 2025 19:02:03 +0200 Subject: [PATCH 10/24] Remove unused round property --- lib/config.js | 3 +-- lib/round-service.js | 9 ++++----- migrations/003.do.checker-rounds.sql | 3 +-- test/round-service.test.js | 1 - 4 files changed, 6 insertions(+), 10 deletions(-) diff --git a/lib/config.js b/lib/config.js index 82781ea..93d3e45 100644 --- a/lib/config.js +++ b/lib/config.js @@ -20,8 +20,7 @@ const poolConfig = { } const roundServiceConfig = { - roundDurationMs: 20 * 60 * 1000, // 20 minutes - maxTasksPerNode: 10, + roundDurationMs: 1_200_000, // 20 minutes checkRoundIntervalMs: 60_000 // 1 minute } diff --git a/lib/round-service.js b/lib/round-service.js index 7cbfdd1..8af3668 100644 --- a/lib/round-service.js +++ b/lib/round-service.js @@ -1,5 +1,5 @@ /** @typedef {{id: number; start_time: string; end_time: string; max_tasks_per_node: number; }} Round */ -/** @typedef {{ roundDurationMs: number; maxTasksPerNode: number; checkRoundIntervalMs: number }} RoundConfig */ +/** @typedef {{ roundDurationMs: number; checkRoundIntervalMs: number }} RoundConfig */ export class RoundService { /** @@ -13,7 +13,6 @@ export class RoundService { #checkRoundIntervalId = null #db #config - #taskingService /** @@ -133,10 +132,10 @@ export class RoundService { const endTime = new Date(now.getTime() + this.#config.roundDurationMs) const { rows } = await this.#db.query(` - INSERT INTO checker_rounds (start_time, end_time, max_tasks_per_node, active) - VALUES ($1, $2, $3, $4) + INSERT INTO checker_rounds (start_time, end_time, active) + VALUES ($1, $2, $3) RETURNING * - `, [now, endTime, this.#config.maxTasksPerNode, true]) + `, [now, endTime, true]) const round = rows[0] console.log(`Created new round #${round.id} starting at ${round.start_time}`) diff --git a/migrations/003.do.checker-rounds.sql b/migrations/003.do.checker-rounds.sql index 5994e88..a1251a6 100644 --- a/migrations/003.do.checker-rounds.sql +++ b/migrations/003.do.checker-rounds.sql @@ -2,6 +2,5 @@ CREATE TABLE IF NOT EXISTS checker_rounds ( id BIGSERIAL PRIMARY KEY, start_time TIMESTAMPTZ NOT NULL, end_time TIMESTAMPTZ NOT NULL, - active BOOLEAN NOT NULL DEFAULT FALSE, - max_tasks_per_node INT NOT NULL DEFAULT 360 + active BOOLEAN NOT NULL DEFAULT FALSE ); diff --git a/test/round-service.test.js b/test/round-service.test.js index 05bcf2a..d2c4d01 100644 --- a/test/round-service.test.js +++ b/test/round-service.test.js @@ -9,7 +9,6 @@ import { TaskingService } from '../lib/tasking-service.js' const DEFAULT_CONFIG = { roundDurationMs: 1000, maxTasks: 100, - maxTasksPerNode: 10, checkRoundIntervalMs: 200 } From 348e3bb9ffd2d69fed547e53b8e3511e0cf88339 Mon Sep 17 00:00:00 2001 From: Srdjan S Date: Mon, 14 Apr 2025 19:03:50 +0200 Subject: [PATCH 11/24] Remove unused properties --- lib/tasking-service.js | 13 ------------- test/round-service.test.js | 17 +++++++---------- 2 files changed, 7 insertions(+), 23 deletions(-) diff --git a/lib/tasking-service.js b/lib/tasking-service.js index 8bec516..085dd88 100644 --- a/lib/tasking-service.js +++ b/lib/tasking-service.js @@ -1,20 +1,7 @@ /** @typedef {any} Task */ /** @typedef {() => Promise} TaskSamplingFn */ -/** @typedef {{ maxTasks: number }} TaskingConfig */ export class TaskingService { - #db - #config - - /** - * @param {import('./typings.js').PgPool} db - * @param {TaskingConfig} config - */ - constructor (db, config) { - this.#db = db - this.#config = config - } - /** * Register a task sampler for a specific subnet * @param {string} subnet - The subnet identifier diff --git a/test/round-service.test.js b/test/round-service.test.js index d2c4d01..cb19598 100644 --- a/test/round-service.test.js +++ b/test/round-service.test.js @@ -8,7 +8,6 @@ import { TaskingService } from '../lib/tasking-service.js' const DEFAULT_CONFIG = { roundDurationMs: 1000, - maxTasks: 100, checkRoundIntervalMs: 200 } @@ -21,9 +20,7 @@ describe('RoundService', () => { before(async () => { pgPool = await createPgPool(DATABASE_URL) await migrateWithPgClient(pgPool) - taskingService = new TaskingService(pgPool, { - maxTasks: DEFAULT_CONFIG.maxTasks - }) + taskingService = new TaskingService() }) after(async () => { @@ -51,9 +48,9 @@ describe('RoundService', () => { const now = new Date() const endTime = new Date(now.getTime() + DEFAULT_CONFIG.roundDurationMs) await pgPool.query(` - INSERT INTO checker_rounds (start_time, end_time, max_tasks_per_node, active) - VALUES ($1, $2, $3, $4) - `, [now, endTime, DEFAULT_CONFIG.maxTasksPerNode, true]) + INSERT INTO checker_rounds (start_time, end_time, active) + VALUES ($1, $2, $3) + `, [now, endTime, true]) const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG) @@ -87,9 +84,9 @@ describe('RoundService', () => { const now = new Date() const endTime = new Date(now.getTime() + 1000) // 1 second duration await pgPool.query(` - INSERT INTO checker_rounds (start_time, end_time, max_tasks_per_node, active) - VALUES ($1, $2, $3, $4) - `, [now, endTime, DEFAULT_CONFIG.maxTasksPerNode, true]) + INSERT INTO checker_rounds (start_time, end_time, active) + VALUES ($1, $2, $3) + `, [now, endTime, true]) const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG) From 639783f1b37d1a0c26ba65acf394e920909c1496 Mon Sep 17 00:00:00 2001 From: Srdjan S Date: Mon, 14 Apr 2025 19:09:12 +0200 Subject: [PATCH 12/24] Remove unused config --- bin/simple-subnet-api.js | 7 ++----- lib/config.js | 6 +----- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/bin/simple-subnet-api.js b/bin/simple-subnet-api.js index 6cf9db0..e9d606b 100644 --- a/bin/simple-subnet-api.js +++ b/bin/simple-subnet-api.js @@ -1,15 +1,12 @@ import '../lib/instrument.js' import { createApp } from '../lib/app.js' -import { DATABASE_URL, HOST, PORT, REQUEST_LOGGING, poolConfig, roundServiceConfig, taskingServiceConfig } from '../lib/config.js' +import { DATABASE_URL, HOST, PORT, REQUEST_LOGGING, poolConfig, roundServiceConfig } from '../lib/config.js' import { TaskingService } from '../lib/tasking-service.js' import { RoundService } from '../lib/round-service.js' import { createPgPool } from '../lib/pool.js' const pool = await createPgPool(DATABASE_URL) -const taskingService = new TaskingService( - pool, - taskingServiceConfig -) +const taskingService = new TaskingService() const roundService = new RoundService( pool, taskingService, diff --git a/lib/config.js b/lib/config.js index 93d3e45..c4b8e20 100644 --- a/lib/config.js +++ b/lib/config.js @@ -24,8 +24,4 @@ const roundServiceConfig = { checkRoundIntervalMs: 60_000 // 1 minute } -const taskingServiceConfig = { - maxTasks: 100 -} - -export { DATABASE_URL, PORT, HOST, REQUEST_LOGGING, poolConfig, roundServiceConfig, taskingServiceConfig } +export { DATABASE_URL, PORT, HOST, REQUEST_LOGGING, poolConfig, roundServiceConfig } From f1329c2236a2e309586e1a6a49b9fc75dd1d3098 Mon Sep 17 00:00:00 2001 From: Srdjan S Date: Mon, 14 Apr 2025 19:37:17 +0200 Subject: [PATCH 13/24] Fix setting expired round as current active round --- lib/round-service.js | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/lib/round-service.js b/lib/round-service.js index 8af3668..b2eb09e 100644 --- a/lib/round-service.js +++ b/lib/round-service.js @@ -58,7 +58,7 @@ export class RoundService { async #initializeRound () { const activeRound = await this.#getActiveRound() - if (activeRound) { + if (activeRound && !this.#isRoundExpired(activeRound)) { this.#currentRound = activeRound console.log(`Resuming active round #${activeRound.id}`) } else { @@ -174,4 +174,12 @@ export class RoundService { client.release() } } + + /** + * @param {{ end_time: string | number | Date; }} round + */ + #isRoundExpired (round) { + const now = new Date() + return new Date(round.end_time) <= now + } } From 2cac5a03257473e1154afad0b3f4ba18dbc5d5c7 Mon Sep 17 00:00:00 2001 From: Srdjan S Date: Tue, 15 Apr 2025 09:47:02 +0200 Subject: [PATCH 14/24] Use test helpers to create round --- test/round-service.test.js | 32 +++++++++++++++++++------------- test/test-helpers.js | 17 +++++++++++++++++ 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/test/round-service.test.js b/test/round-service.test.js index cb19598..35ac705 100644 --- a/test/round-service.test.js +++ b/test/round-service.test.js @@ -5,6 +5,7 @@ import { migrateWithPgClient } from '../lib/migrate.js' import { DATABASE_URL } from '../lib/config.js' import { RoundService } from '../lib/round-service.js' import { TaskingService } from '../lib/tasking-service.js' +import { withRound } from './test-helpers.js' const DEFAULT_CONFIG = { roundDurationMs: 1000, @@ -30,6 +31,7 @@ describe('RoundService', () => { beforeEach(async () => { // Reset the database state before each test await pgPool.query('DELETE FROM checker_rounds') + await pgPool.query('ALTER SEQUENCE checker_rounds_id_seq RESTART WITH 1') }) describe('rounds', () => { @@ -45,12 +47,14 @@ describe('RoundService', () => { }) it('should resume an active round if one exists', async () => { - const now = new Date() - const endTime = new Date(now.getTime() + DEFAULT_CONFIG.roundDurationMs) - await pgPool.query(` - INSERT INTO checker_rounds (start_time, end_time, active) - VALUES ($1, $2, $3) - `, [now, endTime, true]) + const startTime = new Date() + const endTime = new Date(startTime.getTime() + DEFAULT_CONFIG.roundDurationMs) + await withRound({ + pgPool, + startTime, + endTime, + active: true + }) const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG) @@ -59,7 +63,7 @@ describe('RoundService', () => { const { rows: rounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true') assert.strictEqual(rounds.length, 1) - assert.strictEqual(new Date(rounds[0].start_time).toISOString(), now.toISOString()) + assert.strictEqual(new Date(rounds[0].start_time).toISOString(), startTime.toISOString()) }) it('should stop the round service and prevent further round checks', async () => { @@ -81,12 +85,14 @@ describe('RoundService', () => { describe('round transitions', () => { it('should deactivate the old round and create a new one when the current round ends', async () => { - const now = new Date() - const endTime = new Date(now.getTime() + 1000) // 1 second duration - await pgPool.query(` - INSERT INTO checker_rounds (start_time, end_time, active) - VALUES ($1, $2, $3) - `, [now, endTime, true]) + const startTime = new Date() + const endTime = new Date(startTime.getTime() + 1000) // 1 second duration + await withRound({ + pgPool, + startTime, + endTime, + active: true + }) const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG) diff --git a/test/test-helpers.js b/test/test-helpers.js index 1314578..5b54fa5 100644 --- a/test/test-helpers.js +++ b/test/test-helpers.js @@ -47,3 +47,20 @@ export const postMeasurement = (baseUrl, subnet, measurement) => { body: JSON.stringify(measurement) }) } + +/** + * @param {object} args + * @param {import('../lib/typings.js').PgPool} args.pgPool + * @param {Date} args.startTime + * @param {Date} args.endTime + * @param {boolean} [args.active=false] + */ +export const withRound = async ({ pgPool, startTime, endTime, active = false }) => { + const { rows } = await pgPool.query(` + INSERT INTO checker_rounds (start_time, end_time, active) + VALUES ($1, $2, $3) + RETURNING * + `, [startTime, endTime, active]) + + return rows[0] +} From f073bc62fce59a46c53a5a59e547a6894f8f8c2a Mon Sep 17 00:00:00 2001 From: Srdjan Date: Tue, 15 Apr 2025 11:38:47 +0200 Subject: [PATCH 15/24] Update bin/simple-subnet-api.js Co-authored-by: Julian Gruber --- bin/simple-subnet-api.js | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/bin/simple-subnet-api.js b/bin/simple-subnet-api.js index e9d606b..0ae3a3a 100644 --- a/bin/simple-subnet-api.js +++ b/bin/simple-subnet-api.js @@ -18,18 +18,6 @@ roundService.start().catch((error) => { process.exit(1) }) -process.on('SIGINT', async () => { - console.log('Stopping round service...') - roundService.stop() - process.exit(0) -}) - -process.on('SIGTERM', async () => { - console.log('Stopping round service...') - roundService.stop() - process.exit(0) -}) - const app = createApp({ databaseUrl: DATABASE_URL, dbPoolConfig: poolConfig, From e9f6bfe821a6218e69e47ab6de188762ac7effa4 Mon Sep 17 00:00:00 2001 From: Srdjan S Date: Tue, 15 Apr 2025 11:52:39 +0200 Subject: [PATCH 16/24] Refactor round service --- lib/round-service.js | 41 ++++++++++++++++++----------------------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/lib/round-service.js b/lib/round-service.js index b2eb09e..393bacd 100644 --- a/lib/round-service.js +++ b/lib/round-service.js @@ -1,4 +1,4 @@ -/** @typedef {{id: number; start_time: string; end_time: string; max_tasks_per_node: number; }} Round */ +/** @typedef {{id: number; start_time: string; end_time: string; }} Round */ /** @typedef {{ roundDurationMs: number; checkRoundIntervalMs: number }} RoundConfig */ export class RoundService { @@ -6,7 +6,6 @@ export class RoundService { * @type {Round | null} */ #currentRound = null - #isInitializing = false /** * @type {NodeJS.Timeout | null} */ @@ -30,17 +29,13 @@ export class RoundService { * Start the round service */ async start () { - if (this.#isInitializing) return - this.#isInitializing = true - try { await this.#initializeRound() this.#scheduleRoundCheck() console.log(`Round service started. Round duration: ${this.#config.roundDurationMs / 60000} minutes`) } catch (error) { console.error('Failed to start round service:', error) - } finally { - this.#isInitializing = false + throw error } } @@ -90,19 +85,14 @@ export class RoundService { async #startNewRound () { const previousRound = await this.#getActiveRound() this.#currentRound = await this.#createNewRound() - if (!this.#currentRound) { - throw new Error('Failed to start a new round') - } - if (this.#taskingService) { - await this.#taskingService.generateTasksForRound(this.#currentRound.id) - } + await this.#taskingService.generateTasksForRound(this.#currentRound.id) if (previousRound) { - await this.#changeRoundActive(previousRound.id, false) + await this.#changeRoundActive({ roundId: previousRound.id, active: false }) } - await this.#changeRoundActive(this.#currentRound.id, true) + await this.#changeRoundActive({ roundId: this.#currentRound.id, active: true }) } /** @@ -125,17 +115,21 @@ export class RoundService { /** * Create a new round + * + * @returns {Promise} + * @throws {Error} if the round creation fails */ async #createNewRound () { try { - const now = new Date() - const endTime = new Date(now.getTime() + this.#config.roundDurationMs) - const { rows } = await this.#db.query(` INSERT INTO checker_rounds (start_time, end_time, active) - VALUES ($1, $2, $3) + VALUES ( + NOW(), + NOW() + ($1 || ' milliseconds')::INTERVAL, + $2 + ) RETURNING * - `, [now, endTime, true]) + `, [this.#config.roundDurationMs, false]) const round = rows[0] console.log(`Created new round #${round.id} starting at ${round.start_time}`) @@ -148,10 +142,11 @@ export class RoundService { /** * Change the status of a round using a transaction - * @param {number} roundId - * @param {Boolean} active + * @param {object} args + * @param {number} args.roundId + * @param {Boolean} args.active */ - async #changeRoundActive (roundId, active) { + async #changeRoundActive ({ roundId, active }) { const client = await this.#db.connect() try { From 1a37ac28ce73ac393138508f7aedc119449de30e Mon Sep 17 00:00:00 2001 From: Srdjan S Date: Tue, 15 Apr 2025 11:56:27 +0200 Subject: [PATCH 17/24] Test round expiry on database level --- lib/round-service.js | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/lib/round-service.js b/lib/round-service.js index 393bacd..2ade23f 100644 --- a/lib/round-service.js +++ b/lib/round-service.js @@ -53,7 +53,7 @@ export class RoundService { async #initializeRound () { const activeRound = await this.#getActiveRound() - if (activeRound && !this.#isRoundExpired(activeRound)) { + if (activeRound && !activeRound.is_expired) { this.#currentRound = activeRound console.log(`Resuming active round #${activeRound.id}`) } else { @@ -97,11 +97,15 @@ export class RoundService { /** * Get the current active round from the database + * @returns {Promise} */ async #getActiveRound () { try { const { rows } = await this.#db.query(` - SELECT * FROM checker_rounds + SELECT + cr.*, + cr.end_time <= NOW() AS is_expired + FROM checker_rounds cr WHERE active = true ORDER BY start_time DESC LIMIT 1 @@ -169,12 +173,4 @@ export class RoundService { client.release() } } - - /** - * @param {{ end_time: string | number | Date; }} round - */ - #isRoundExpired (round) { - const now = new Date() - return new Date(round.end_time) <= now - } } From 5cfe64ed377ceb9b64d94eaa4bc7aba3b9ea6777 Mon Sep 17 00:00:00 2001 From: Srdjan Date: Tue, 15 Apr 2025 11:57:38 +0200 Subject: [PATCH 18/24] Update lib/round-service.js Co-authored-by: Julian Gruber --- lib/round-service.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/round-service.js b/lib/round-service.js index 2ade23f..dcd6f4a 100644 --- a/lib/round-service.js +++ b/lib/round-service.js @@ -35,6 +35,7 @@ export class RoundService { console.log(`Round service started. Round duration: ${this.#config.roundDurationMs / 60000} minutes`) } catch (error) { console.error('Failed to start round service:', error) + throw err throw error } } From 5a3085dd0c2b7c98f3b62c88e8a5afcb7fe21117 Mon Sep 17 00:00:00 2001 From: Srdjan S Date: Tue, 15 Apr 2025 12:43:37 +0200 Subject: [PATCH 19/24] Refactor round service to use database level time --- lib/round-service.js | 42 ++++++++++++++++---------------------- test/round-service.test.js | 16 +++------------ test/test-helpers.js | 13 +++++++----- 3 files changed, 29 insertions(+), 42 deletions(-) diff --git a/lib/round-service.js b/lib/round-service.js index 2ade23f..8354c3e 100644 --- a/lib/round-service.js +++ b/lib/round-service.js @@ -2,10 +2,6 @@ /** @typedef {{ roundDurationMs: number; checkRoundIntervalMs: number }} RoundConfig */ export class RoundService { - /** - * @type {Round | null} - */ - #currentRound = null /** * @type {NodeJS.Timeout | null} */ @@ -51,11 +47,10 @@ export class RoundService { * Initialize the current round */ async #initializeRound () { - const activeRound = await this.#getActiveRound() + const currentRound = await this.#getCurrentActiveRound() - if (activeRound && !activeRound.is_expired) { - this.#currentRound = activeRound - console.log(`Resuming active round #${activeRound.id}`) + if (currentRound && !currentRound.is_expired) { + console.log(`Resuming active round #${currentRound.id}`) } else { await this.#startNewRound() } @@ -66,15 +61,15 @@ export class RoundService { */ #scheduleRoundCheck () { this.#checkRoundIntervalId = setInterval(async () => { - if (!this.#currentRound) return - - const now = new Date() - if (new Date(this.#currentRound.end_time) <= now) { - try { - await this.#startNewRound() - } catch (error) { - console.error('Error handling round end:', error) + try { + const currentRound = await this.#getCurrentActiveRound() + if (currentRound && !currentRound.is_expired) { + return } + + await this.#startNewRound() + } catch (error) { + console.error('Error handling round end:', error) } }, this.#config.checkRoundIntervalMs) } @@ -83,23 +78,22 @@ export class RoundService { * Start a new round */ async #startNewRound () { - const previousRound = await this.#getActiveRound() - this.#currentRound = await this.#createNewRound() - - await this.#taskingService.generateTasksForRound(this.#currentRound.id) + const currentRound = await this.#getCurrentActiveRound() + const newRound = await this.#createNewRound() - if (previousRound) { - await this.#changeRoundActive({ roundId: previousRound.id, active: false }) + await this.#taskingService.generateTasksForRound(newRound.id) + if (currentRound) { + await this.#changeRoundActive({ roundId: currentRound.id, active: false }) } - await this.#changeRoundActive({ roundId: this.#currentRound.id, active: true }) + await this.#changeRoundActive({ roundId: newRound.id, active: true }) } /** * Get the current active round from the database * @returns {Promise} */ - async #getActiveRound () { + async #getCurrentActiveRound () { try { const { rows } = await this.#db.query(` SELECT diff --git a/test/round-service.test.js b/test/round-service.test.js index 35ac705..be87a28 100644 --- a/test/round-service.test.js +++ b/test/round-service.test.js @@ -47,12 +47,9 @@ describe('RoundService', () => { }) it('should resume an active round if one exists', async () => { - const startTime = new Date() - const endTime = new Date(startTime.getTime() + DEFAULT_CONFIG.roundDurationMs) await withRound({ pgPool, - startTime, - endTime, + roundDurationMs: DEFAULT_CONFIG.roundDurationMs, active: true }) @@ -63,7 +60,6 @@ describe('RoundService', () => { const { rows: rounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true') assert.strictEqual(rounds.length, 1) - assert.strictEqual(new Date(rounds[0].start_time).toISOString(), startTime.toISOString()) }) it('should stop the round service and prevent further round checks', async () => { @@ -85,29 +81,23 @@ describe('RoundService', () => { describe('round transitions', () => { it('should deactivate the old round and create a new one when the current round ends', async () => { - const startTime = new Date() - const endTime = new Date(startTime.getTime() + 1000) // 1 second duration await withRound({ pgPool, - startTime, - endTime, + roundDurationMs: 1000, // 1 second duration active: true }) const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG) await roundService.start() - // Wait for the current round to end await new Promise(resolve => setTimeout(resolve, 2000)) roundService.stop() const { rows: activeRounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true') - assert.strictEqual(activeRounds.length, 1) - assert.ok(new Date(activeRounds[0].start_time) > endTime) - const { rows: allRounds } = await pgPool.query('SELECT * FROM checker_rounds') + assert.strictEqual(activeRounds.length, 1) assert.strictEqual(allRounds.length, 2) }) }) diff --git a/test/test-helpers.js b/test/test-helpers.js index 5b54fa5..78b74d6 100644 --- a/test/test-helpers.js +++ b/test/test-helpers.js @@ -51,16 +51,19 @@ export const postMeasurement = (baseUrl, subnet, measurement) => { /** * @param {object} args * @param {import('../lib/typings.js').PgPool} args.pgPool - * @param {Date} args.startTime - * @param {Date} args.endTime + * @param {number} args.roundDurationMs * @param {boolean} [args.active=false] */ -export const withRound = async ({ pgPool, startTime, endTime, active = false }) => { +export const withRound = async ({ pgPool, roundDurationMs, active = false }) => { const { rows } = await pgPool.query(` INSERT INTO checker_rounds (start_time, end_time, active) - VALUES ($1, $2, $3) + VALUES ( + NOW(), + NOW() + ($1 || ' milliseconds')::INTERVAL, + $2 + ) RETURNING * - `, [startTime, endTime, active]) + `, [roundDurationMs, active]) return rows[0] } From a93dd4cf24ecd9338e4e64bacbb6fd02d5d68b18 Mon Sep 17 00:00:00 2001 From: Srdjan S Date: Tue, 15 Apr 2025 12:49:52 +0200 Subject: [PATCH 20/24] Fix throwing error --- lib/round-service.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/round-service.js b/lib/round-service.js index 25edfc7..8bb0f17 100644 --- a/lib/round-service.js +++ b/lib/round-service.js @@ -31,7 +31,6 @@ export class RoundService { console.log(`Round service started. Round duration: ${this.#config.roundDurationMs / 60000} minutes`) } catch (error) { console.error('Failed to start round service:', error) - throw err throw error } } @@ -108,7 +107,7 @@ export class RoundService { return rows[0] || null } catch (error) { console.error('Error getting active round:', error) - return null + throw error } } From 3cf95d75d1c42c4c2227ab26804037d2d95ab83c Mon Sep 17 00:00:00 2001 From: Srdjan S Date: Tue, 15 Apr 2025 13:04:18 +0200 Subject: [PATCH 21/24] Rename checkRoundIntervalMs to checkRoundExpirationIntervalMs --- lib/config.js | 2 +- lib/round-service.js | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/config.js b/lib/config.js index c4b8e20..2e7cb3d 100644 --- a/lib/config.js +++ b/lib/config.js @@ -21,7 +21,7 @@ const poolConfig = { const roundServiceConfig = { roundDurationMs: 1_200_000, // 20 minutes - checkRoundIntervalMs: 60_000 // 1 minute + checkRoundExpirationIntervalMs: 60_000 // 1 minute } export { DATABASE_URL, PORT, HOST, REQUEST_LOGGING, poolConfig, roundServiceConfig } diff --git a/lib/round-service.js b/lib/round-service.js index 8bb0f17..0402f14 100644 --- a/lib/round-service.js +++ b/lib/round-service.js @@ -1,11 +1,11 @@ /** @typedef {{id: number; start_time: string; end_time: string; }} Round */ -/** @typedef {{ roundDurationMs: number; checkRoundIntervalMs: number }} RoundConfig */ +/** @typedef {{ roundDurationMs: number; checkRoundExpirationIntervalMs: number }} RoundConfig */ export class RoundService { /** * @type {NodeJS.Timeout | null} */ - #checkRoundIntervalId = null + checkRoundExpirationInterval = null #db #config #taskingService @@ -27,7 +27,7 @@ export class RoundService { async start () { try { await this.#initializeRound() - this.#scheduleRoundCheck() + this.#scheduleRoundExpirationCheck() console.log(`Round service started. Round duration: ${this.#config.roundDurationMs / 60000} minutes`) } catch (error) { console.error('Failed to start round service:', error) @@ -39,7 +39,7 @@ export class RoundService { * Stop the round service */ stop () { - if (this.#checkRoundIntervalId) clearInterval(this.#checkRoundIntervalId) + if (this.checkRoundExpirationInterval) clearInterval(this.checkRoundExpirationInterval) console.log('Round service stopped') } @@ -57,10 +57,10 @@ export class RoundService { } /** - * Schedule periodic checks for round end + * Schedule periodic checks for round expiration */ - #scheduleRoundCheck () { - this.#checkRoundIntervalId = setInterval(async () => { + #scheduleRoundExpirationCheck () { + this.checkRoundExpirationInterval = setInterval(async () => { try { const currentRound = await this.#getCurrentActiveRound() if (currentRound && !currentRound.is_expired) { @@ -71,7 +71,7 @@ export class RoundService { } catch (error) { console.error('Error handling round end:', error) } - }, this.#config.checkRoundIntervalMs) + }, this.#config.checkRoundExpirationIntervalMs) } /** From 1ff9a62f0bd32642e03b4cc576b4111172774595 Mon Sep 17 00:00:00 2001 From: Srdjan S Date: Tue, 15 Apr 2025 13:20:06 +0200 Subject: [PATCH 22/24] Use camel case --- lib/round-service.js | 41 +++++++++++++++++++++++++------------- test/round-service.test.js | 4 ++-- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/lib/round-service.js b/lib/round-service.js index 0402f14..20f0b9a 100644 --- a/lib/round-service.js +++ b/lib/round-service.js @@ -1,4 +1,4 @@ -/** @typedef {{id: number; start_time: string; end_time: string; }} Round */ +/** @typedef {{id: number; startTime: string; endTime: string; isExpired: Boolean; }} Round */ /** @typedef {{ roundDurationMs: number; checkRoundExpirationIntervalMs: number }} RoundConfig */ export class RoundService { @@ -49,7 +49,7 @@ export class RoundService { async #initializeRound () { const currentRound = await this.#getCurrentActiveRound() - if (currentRound && !currentRound.is_expired) { + if (currentRound && !currentRound.isExpired) { console.log(`Resuming active round #${currentRound.id}`) } else { await this.#startNewRound() @@ -63,7 +63,7 @@ export class RoundService { this.checkRoundExpirationInterval = setInterval(async () => { try { const currentRound = await this.#getCurrentActiveRound() - if (currentRound && !currentRound.is_expired) { + if (currentRound && !currentRound.isExpired) { return } @@ -91,7 +91,7 @@ export class RoundService { /** * Get the current active round from the database - * @returns {Promise} + * @returns {Promise} */ async #getCurrentActiveRound () { try { @@ -104,7 +104,18 @@ export class RoundService { ORDER BY start_time DESC LIMIT 1 `) - return rows[0] || null + + if (!rows || rows.length === 0) { + console.log('No active round found') + return null + } + + return { + id: rows[0].id, + startTime: rows[0].start_time, + endTime: rows[0].end_time, + isExpired: rows[0].is_expired + } } catch (error) { console.error('Error getting active round:', error) throw error @@ -129,9 +140,13 @@ export class RoundService { RETURNING * `, [this.#config.roundDurationMs, false]) - const round = rows[0] - console.log(`Created new round #${round.id} starting at ${round.start_time}`) - return round + console.log(`Created new round #${rows[0].id} starting at ${rows[0].startTime}`) + return { + id: rows[0].id, + startTime: rows[0].start_time, + endTime: rows[0].end_time, + isExpired: false + } } catch (error) { console.error('Error creating new round:', error) throw error @@ -149,16 +164,14 @@ export class RoundService { try { await client.query('BEGIN') - const { rows } = await client.query(` + await client.query(` UPDATE checker_rounds SET active = $1 - WHERE id = $2 - RETURNING * - `, [active, roundId]) + WHERE id = $2`, + [active, roundId]) await client.query('COMMIT') - console.log(`Round #${rows[0].id} active: ${rows[0].active}`) - return rows[0] + console.log(`Round #${roundId} active: ${active}`) } catch (error) { await client.query('ROLLBACK') console.error('Error changing round status:', error) diff --git a/test/round-service.test.js b/test/round-service.test.js index be87a28..990aefd 100644 --- a/test/round-service.test.js +++ b/test/round-service.test.js @@ -9,7 +9,7 @@ import { withRound } from './test-helpers.js' const DEFAULT_CONFIG = { roundDurationMs: 1000, - checkRoundIntervalMs: 200 + checkRoundExpirationIntervalMs: 200 } describe('RoundService', () => { @@ -72,7 +72,7 @@ describe('RoundService', () => { assert.strictEqual(rounds.length, 1) // Wait for the check interval to pass and ensure no new rounds are created - await new Promise(resolve => setTimeout(resolve, DEFAULT_CONFIG.checkRoundIntervalMs + 1000)) + await new Promise(resolve => setTimeout(resolve, DEFAULT_CONFIG.checkRoundExpirationIntervalMs + 1000)) const { rows: newRounds } = await pgPool.query('SELECT * FROM checker_rounds') assert.strictEqual(newRounds.length, 1) From 8a1fe17ffdce2bb7c9166a0d66eb8e03febad4ce Mon Sep 17 00:00:00 2001 From: Srdjan S Date: Tue, 15 Apr 2025 13:23:10 +0200 Subject: [PATCH 23/24] Remove logs --- lib/round-service.js | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/round-service.js b/lib/round-service.js index 20f0b9a..7f8e02e 100644 --- a/lib/round-service.js +++ b/lib/round-service.js @@ -106,7 +106,6 @@ export class RoundService { `) if (!rows || rows.length === 0) { - console.log('No active round found') return null } From a37eb0b125eb0072d8f98a596bb535b72fe9462b Mon Sep 17 00:00:00 2001 From: Srdjan S Date: Tue, 15 Apr 2025 14:44:42 +0200 Subject: [PATCH 24/24] Refactor round service --- lib/round-service.js | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/lib/round-service.js b/lib/round-service.js index 7f8e02e..8f8b33a 100644 --- a/lib/round-service.js +++ b/lib/round-service.js @@ -91,7 +91,7 @@ export class RoundService { /** * Get the current active round from the database - * @returns {Promise} + * @returns {Promise} */ async #getCurrentActiveRound () { try { @@ -105,15 +105,16 @@ export class RoundService { LIMIT 1 `) - if (!rows || rows.length === 0) { + if (!rows.length) { return null } + const [round] = rows return { - id: rows[0].id, - startTime: rows[0].start_time, - endTime: rows[0].end_time, - isExpired: rows[0].is_expired + id: round.id, + startTime: round.start_time, + endTime: round.end_time, + isExpired: round.is_expired } } catch (error) { console.error('Error getting active round:', error) @@ -136,15 +137,16 @@ export class RoundService { NOW() + ($1 || ' milliseconds')::INTERVAL, $2 ) - RETURNING * + RETURNING *, end_time <= NOW() AS is_expired `, [this.#config.roundDurationMs, false]) - console.log(`Created new round #${rows[0].id} starting at ${rows[0].startTime}`) + const [round] = rows + console.log(`Created new round #${round.id} starting at ${round.startTime}`) return { - id: rows[0].id, - startTime: rows[0].start_time, - endTime: rows[0].end_time, - isExpired: false + id: round.id, + startTime: round.start_time, + endTime: round.end_time, + isExpired: round.is_expired } } catch (error) { console.error('Error creating new round:', error)