diff --git a/bin/simple-subnet-api.js b/bin/simple-subnet-api.js index e638191..0ae3a3a 100644 --- a/bin/simple-subnet-api.js +++ b/bin/simple-subnet-api.js @@ -1,6 +1,22 @@ import '../lib/instrument.js' import { createApp } from '../lib/app.js' -import { DATABASE_URL, HOST, PORT, REQUEST_LOGGING, poolConfig } from '../lib/config.js' +import { DATABASE_URL, HOST, PORT, REQUEST_LOGGING, poolConfig, roundServiceConfig } from '../lib/config.js' +import { TaskingService } from '../lib/tasking-service.js' +import { RoundService } from '../lib/round-service.js' +import { createPgPool } from '../lib/pool.js' + +const pool = await createPgPool(DATABASE_URL) +const taskingService = new TaskingService() +const roundService = new RoundService( + pool, + taskingService, + roundServiceConfig +) + +roundService.start().catch((error) => { + console.error('Failed to start round service:', error) + process.exit(1) +}) const app = createApp({ databaseUrl: DATABASE_URL, diff --git a/lib/config.js b/lib/config.js index fc42382..2e7cb3d 100644 --- a/lib/config.js +++ b/lib/config.js @@ -19,4 +19,9 @@ const poolConfig = { maxLifetimeSeconds: 60 } -export { DATABASE_URL, PORT, HOST, REQUEST_LOGGING, poolConfig } +const roundServiceConfig = { + roundDurationMs: 1_200_000, // 20 minutes + checkRoundExpirationIntervalMs: 60_000 // 1 minute +} + +export { DATABASE_URL, PORT, HOST, REQUEST_LOGGING, poolConfig, roundServiceConfig } diff --git a/lib/round-service.js b/lib/round-service.js new file mode 100644 index 0000000..8f8b33a --- /dev/null +++ b/lib/round-service.js @@ -0,0 +1,184 @@ +/** @typedef {{id: number; startTime: string; endTime: string; isExpired: Boolean; }} Round */ +/** @typedef {{ roundDurationMs: number; checkRoundExpirationIntervalMs: number }} RoundConfig */ + +export class RoundService { + /** + * @type {NodeJS.Timeout | null} + */ + checkRoundExpirationInterval = null + #db + #config + #taskingService + + /** + * @param {import('./typings.js').PgPool} db + * @param {import('./tasking-service.js').TaskingService} taskingService + * @param {RoundConfig} config + */ + constructor (db, taskingService, config) { + this.#db = db + this.#config = config + this.#taskingService = taskingService + } + + /** + * Start the round service + */ + async start () { + try { + await this.#initializeRound() + this.#scheduleRoundExpirationCheck() + console.log(`Round service started. Round duration: ${this.#config.roundDurationMs / 60000} minutes`) + } catch (error) { + console.error('Failed to start round service:', error) + throw error + } + } + + /** + * Stop the round service + */ + stop () { + if (this.checkRoundExpirationInterval) clearInterval(this.checkRoundExpirationInterval) + console.log('Round service stopped') + } + + /** + * Initialize the current round + */ + async #initializeRound () { + const currentRound = await this.#getCurrentActiveRound() + + if (currentRound && !currentRound.isExpired) { + console.log(`Resuming active round #${currentRound.id}`) + } else { + await this.#startNewRound() + } + } + + /** + * Schedule periodic checks for round expiration + */ + #scheduleRoundExpirationCheck () { + this.checkRoundExpirationInterval = setInterval(async () => { + try { + const currentRound = await this.#getCurrentActiveRound() + if (currentRound && !currentRound.isExpired) { + return + } + + await this.#startNewRound() + } catch (error) { + console.error('Error handling round end:', error) + } + }, this.#config.checkRoundExpirationIntervalMs) + } + + /** + * Start a new round + */ + async #startNewRound () { + const currentRound = await this.#getCurrentActiveRound() + const newRound = await this.#createNewRound() + + await this.#taskingService.generateTasksForRound(newRound.id) + if (currentRound) { + await this.#changeRoundActive({ roundId: currentRound.id, active: false }) + } + + await this.#changeRoundActive({ roundId: newRound.id, active: true }) + } + + /** + * Get the current active round from the database + * @returns {Promise} + */ + async #getCurrentActiveRound () { + try { + const { rows } = await this.#db.query(` + SELECT + cr.*, + cr.end_time <= NOW() AS is_expired + FROM checker_rounds cr + WHERE active = true + ORDER BY start_time DESC + LIMIT 1 + `) + + if (!rows.length) { + return null + } + + const [round] = rows + return { + id: round.id, + startTime: round.start_time, + endTime: round.end_time, + isExpired: round.is_expired + } + } catch (error) { + console.error('Error getting active round:', error) + throw error + } + } + + /** + * Create a new round + * + * @returns {Promise} + * @throws {Error} if the round creation fails + */ + async #createNewRound () { + try { + const { rows } = await this.#db.query(` + INSERT INTO checker_rounds (start_time, end_time, active) + VALUES ( + NOW(), + NOW() + ($1 || ' milliseconds')::INTERVAL, + $2 + ) + RETURNING *, end_time <= NOW() AS is_expired + `, [this.#config.roundDurationMs, false]) + + const [round] = rows + console.log(`Created new round #${round.id} starting at ${round.startTime}`) + return { + id: round.id, + startTime: round.start_time, + endTime: round.end_time, + isExpired: round.is_expired + } + } catch (error) { + console.error('Error creating new round:', error) + throw error + } + } + + /** + * Change the status of a round using a transaction + * @param {object} args + * @param {number} args.roundId + * @param {Boolean} args.active + */ + async #changeRoundActive ({ roundId, active }) { + const client = await this.#db.connect() + + try { + await client.query('BEGIN') + await client.query(` + UPDATE checker_rounds + SET active = $1 + WHERE id = $2`, + [active, roundId]) + await client.query('COMMIT') + + console.log(`Round #${roundId} active: ${active}`) + } catch (error) { + await client.query('ROLLBACK') + console.error('Error changing round status:', error) + throw error + } finally { + client.release() + } + } +} diff --git a/lib/tasking-service.js b/lib/tasking-service.js new file mode 100644 index 0000000..085dd88 --- /dev/null +++ b/lib/tasking-service.js @@ -0,0 +1,22 @@ +/** @typedef {any} Task */ +/** @typedef {() => Promise} TaskSamplingFn */ + +export class TaskingService { + /** + * Register a task sampler for a specific subnet + * @param {string} subnet - The subnet identifier + * @param {TaskSamplingFn} sampleFn - Function that generates tasks for a subnet + */ + registerTaskSampler (subnet, sampleFn) { + console.warn('Registering task sampler is not implemented.') + } + + /** + * Generate tasks for all registered subnets for a specific round + * @param {number} roundId + */ + async generateTasksForRound (roundId) { + // TODO: Implement the logic to generate tasks for all registered subnets + console.warn('Tasking service is not implemented.') + } +} diff --git a/migrations/003.do.checker-rounds.sql b/migrations/003.do.checker-rounds.sql new file mode 100644 index 0000000..a1251a6 --- /dev/null +++ b/migrations/003.do.checker-rounds.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS checker_rounds ( + id BIGSERIAL PRIMARY KEY, + start_time TIMESTAMPTZ NOT NULL, + end_time TIMESTAMPTZ NOT NULL, + active BOOLEAN NOT NULL DEFAULT FALSE +); diff --git a/test/round-service.test.js b/test/round-service.test.js new file mode 100644 index 0000000..990aefd --- /dev/null +++ b/test/round-service.test.js @@ -0,0 +1,104 @@ +import assert from 'assert' +import { after, before, beforeEach, describe, it } from 'node:test' +import { createPgPool } from '../lib/pool.js' +import { migrateWithPgClient } from '../lib/migrate.js' +import { DATABASE_URL } from '../lib/config.js' +import { RoundService } from '../lib/round-service.js' +import { TaskingService } from '../lib/tasking-service.js' +import { withRound } from './test-helpers.js' + +const DEFAULT_CONFIG = { + roundDurationMs: 1000, + checkRoundExpirationIntervalMs: 200 +} + +describe('RoundService', () => { + /** @type {import('pg').Pool} */ + let pgPool + /** @type {TaskingService} */ + let taskingService + + before(async () => { + pgPool = await createPgPool(DATABASE_URL) + await migrateWithPgClient(pgPool) + taskingService = new TaskingService() + }) + + after(async () => { + await pgPool.end() + }) + + beforeEach(async () => { + // Reset the database state before each test + await pgPool.query('DELETE FROM checker_rounds') + await pgPool.query('ALTER SEQUENCE checker_rounds_id_seq RESTART WITH 1') + }) + + describe('rounds', () => { + it('should create a new round if no active round exists', async () => { + const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG) + + await roundService.start() + roundService.stop() + + const { rows: rounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true') + assert.strictEqual(rounds.length, 1) + assert.ok(new Date(rounds[0].end_time) > new Date()) + }) + + it('should resume an active round if one exists', async () => { + await withRound({ + pgPool, + roundDurationMs: DEFAULT_CONFIG.roundDurationMs, + active: true + }) + + const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG) + + await roundService.start() + roundService.stop() + + const { rows: rounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true') + assert.strictEqual(rounds.length, 1) + }) + + it('should stop the round service and prevent further round checks', async () => { + const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG) + + await roundService.start() + roundService.stop() + + const { rows: rounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true') + assert.strictEqual(rounds.length, 1) + + // Wait for the check interval to pass and ensure no new rounds are created + await new Promise(resolve => setTimeout(resolve, DEFAULT_CONFIG.checkRoundExpirationIntervalMs + 1000)) + + const { rows: newRounds } = await pgPool.query('SELECT * FROM checker_rounds') + assert.strictEqual(newRounds.length, 1) + }) + }) + + describe('round transitions', () => { + it('should deactivate the old round and create a new one when the current round ends', async () => { + await withRound({ + pgPool, + roundDurationMs: 1000, // 1 second duration + active: true + }) + + const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG) + + await roundService.start() + // Wait for the current round to end + await new Promise(resolve => setTimeout(resolve, 2000)) + + roundService.stop() + + const { rows: activeRounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true') + const { rows: allRounds } = await pgPool.query('SELECT * FROM checker_rounds') + assert.strictEqual(activeRounds.length, 1) + assert.strictEqual(allRounds.length, 2) + }) + }) +}) diff --git a/test/test-helpers.js b/test/test-helpers.js index 1314578..78b74d6 100644 --- a/test/test-helpers.js +++ b/test/test-helpers.js @@ -47,3 +47,23 @@ export const postMeasurement = (baseUrl, subnet, measurement) => { body: JSON.stringify(measurement) }) } + +/** + * @param {object} args + * @param {import('../lib/typings.js').PgPool} args.pgPool + * @param {number} args.roundDurationMs + * @param {boolean} [args.active=false] + */ +export const withRound = async ({ pgPool, roundDurationMs, active = false }) => { + const { rows } = await pgPool.query(` + INSERT INTO checker_rounds (start_time, end_time, active) + VALUES ( + NOW(), + NOW() + ($1 || ' milliseconds')::INTERVAL, + $2 + ) + RETURNING * + `, [roundDurationMs, active]) + + return rows[0] +}