Skip to content

add: Round service #48

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 26 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d29133e
Add round service
pyropy Apr 11, 2025
11eee5a
Separate round and tasking service
pyropy Apr 14, 2025
7e0a4e2
Remove tasking implementation
pyropy Apr 14, 2025
91fa889
Remove checker tasks table
pyropy Apr 14, 2025
bf300bf
Register round service
pyropy Apr 14, 2025
538a697
Add round and tasking configs
pyropy Apr 14, 2025
0617456
Fix config.js
pyropy Apr 14, 2025
fb6591a
Update lib/tasking-service.js
pyropy Apr 14, 2025
85b80e0
Fix config
pyropy Apr 14, 2025
c581ec7
Remove unused round property
pyropy Apr 14, 2025
348e3bb
Remove unused properties
pyropy Apr 14, 2025
639783f
Remove unused config
pyropy Apr 14, 2025
f1329c2
Fix setting expired round as current active round
pyropy Apr 14, 2025
2cac5a0
Use test helpers to create round
pyropy Apr 15, 2025
f073bc6
Update bin/simple-subnet-api.js
pyropy Apr 15, 2025
e9f6bfe
Refactor round service
pyropy Apr 15, 2025
c5d9c11
Merge branch 'add/round-service' of github.com:CheckerNetwork/simple-…
pyropy Apr 15, 2025
1a37ac2
Test round expiry on database level
pyropy Apr 15, 2025
5cfe64e
Update lib/round-service.js
pyropy Apr 15, 2025
5a3085d
Refactor round service to use database level time
pyropy Apr 15, 2025
4b15395
Merge branch 'add/round-service' of github.com:CheckerNetwork/simple-…
pyropy Apr 15, 2025
a93dd4c
Fix throwing error
pyropy Apr 15, 2025
3cf95d7
Rename checkRoundIntervalMs to checkRoundExpirationIntervalMs
pyropy Apr 15, 2025
1ff9a62
Use camel case
pyropy Apr 15, 2025
8a1fe17
Remove logs
pyropy Apr 15, 2025
a37eb0b
Refactor round service
pyropy Apr 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion bin/simple-subnet-api.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
import '../lib/instrument.js'
import { createApp } from '../lib/app.js'
import { DATABASE_URL, HOST, PORT, REQUEST_LOGGING, poolConfig } from '../lib/config.js'
import { DATABASE_URL, HOST, PORT, REQUEST_LOGGING, poolConfig, roundServiceConfig } from '../lib/config.js'
import { TaskingService } from '../lib/tasking-service.js'
import { RoundService } from '../lib/round-service.js'
import { createPgPool } from '../lib/pool.js'

const pool = await createPgPool(DATABASE_URL)
const taskingService = new TaskingService()
const roundService = new RoundService(
pool,
taskingService,
roundServiceConfig
)

roundService.start().catch((error) => {
console.error('Failed to start round service:', error)
process.exit(1)
})

const app = createApp({
databaseUrl: DATABASE_URL,
Expand Down
7 changes: 6 additions & 1 deletion lib/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,9 @@ const poolConfig = {
maxLifetimeSeconds: 60
}

export { DATABASE_URL, PORT, HOST, REQUEST_LOGGING, poolConfig }
const roundServiceConfig = {
roundDurationMs: 1_200_000, // 20 minutes
checkRoundExpirationIntervalMs: 60_000 // 1 minute
}

export { DATABASE_URL, PORT, HOST, REQUEST_LOGGING, poolConfig, roundServiceConfig }
184 changes: 184 additions & 0 deletions lib/round-service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/** @typedef {{id: number; startTime: string; endTime: string; isExpired: Boolean; }} Round */
/** @typedef {{ roundDurationMs: number; checkRoundExpirationIntervalMs: number }} RoundConfig */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be inferred from lib/config.js? Something like type RoundConfig = typeof config.roundServiceConfig


export class RoundService {
/**
* @type {NodeJS.Timeout | null}
*/
checkRoundExpirationInterval = null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this key not private as well?

#db
#config
#taskingService

/**
* @param {import('./typings.js').PgPool} db
* @param {import('./tasking-service.js').TaskingService} taskingService
* @param {RoundConfig} config
*/
constructor (db, taskingService, config) {
this.#db = db
this.#config = config
this.#taskingService = taskingService
}

/**
* Start the round service
*/
async start () {
try {
await this.#initializeRound()
this.#scheduleRoundExpirationCheck()
console.log(`Round service started. Round duration: ${this.#config.roundDurationMs / 60000} minutes`)
} catch (error) {
console.error('Failed to start round service:', error)
throw error
}
}

/**
* Stop the round service
*/
stop () {
if (this.checkRoundExpirationInterval) clearInterval(this.checkRoundExpirationInterval)
console.log('Round service stopped')
}

/**
* Initialize the current round
*/
async #initializeRound () {
const currentRound = await this.#getCurrentActiveRound()

if (currentRound && !currentRound.isExpired) {
console.log(`Resuming active round #${currentRound.id}`)
} else {
await this.#startNewRound()
}
}

/**
* Schedule periodic checks for round expiration
*/
#scheduleRoundExpirationCheck () {
this.checkRoundExpirationInterval = setInterval(async () => {
try {
const currentRound = await this.#getCurrentActiveRound()
if (currentRound && !currentRound.isExpired) {
return
}

await this.#startNewRound()
Comment on lines +66 to +70
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is repeated in #initializeRound() can we refactor into a function #maybeStartNewRound()?

} catch (error) {
console.error('Error handling round end:', error)
}
}, this.#config.checkRoundExpirationIntervalMs)
}

/**
* Start a new round
*/
async #startNewRound () {
const currentRound = await this.#getCurrentActiveRound()
const newRound = await this.#createNewRound()

await this.#taskingService.generateTasksForRound(newRound.id)
if (currentRound) {
await this.#changeRoundActive({ roundId: currentRound.id, active: false })
}

await this.#changeRoundActive({ roundId: newRound.id, active: true })
}

/**
* Get the current active round from the database
* @returns {Promise<Round | null>}
*/
async #getCurrentActiveRound () {
try {
const { rows } = await this.#db.query(`
SELECT
cr.*,
cr.end_time <= NOW() AS is_expired
FROM checker_rounds cr
WHERE active = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of the active field? Can we simplify this to querying what is the last round where end_time > NOW()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Motivation behind the active field was to switch on round to active after the task creation. Unlike the implementation in the spark-api where tasks are created during the round creation on the database level, here we call TaskingService to create tasks for the round.

Thing that I'm questioning is should we or should we not leave the last round as active in case the TaskingService fails to sample / insert tasks for the new round?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, you can only consider the round active once tasks have been sampled, but you can also only sample tasks once the round exists, right?

Can we then rename active to has_had_tasks_defined? This way the purpose to me is more clear. Another way is to JOIN on the tasks for that round, and simply consider any round as inactive if it has no matching tasks. This assumes we don't want to have empty rounds.

ORDER BY start_time DESC
LIMIT 1
`)

if (!rows.length) {
return null
}

const [round] = rows
return {
id: round.id,
startTime: round.start_time,
endTime: round.end_time,
isExpired: round.is_expired
}
} catch (error) {
console.error('Error getting active round:', error)
throw error
}
}

/**
* Create a new round
*
* @returns {Promise<Round>}
* @throws {Error} if the round creation fails
*/
async #createNewRound () {
try {
const { rows } = await this.#db.query(`
INSERT INTO checker_rounds (start_time, end_time, active)
VALUES (
NOW(),
NOW() + ($1 || ' milliseconds')::INTERVAL,
$2
)
RETURNING *, end_time <= NOW() AS is_expired
`, [this.#config.roundDurationMs, false])

const [round] = rows
console.log(`Created new round #${round.id} starting at ${round.startTime}`)
return {
id: round.id,
startTime: round.start_time,
endTime: round.end_time,
isExpired: round.is_expired
}
} catch (error) {
console.error('Error creating new round:', error)
throw error
}
}

/**
* Change the status of a round using a transaction
* @param {object} args
* @param {number} args.roundId
* @param {Boolean} args.active
*/
async #changeRoundActive ({ roundId, active }) {
const client = await this.#db.connect()

try {
await client.query('BEGIN')
await client.query(`
UPDATE checker_rounds
SET active = $1
WHERE id = $2`,
[active, roundId])
await client.query('COMMIT')

console.log(`Round #${roundId} active: ${active}`)
} catch (error) {
await client.query('ROLLBACK')
console.error('Error changing round status:', error)
throw error
} finally {
client.release()
}
}
}
22 changes: 22 additions & 0 deletions lib/tasking-service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/** @typedef {any} Task */
/** @typedef {() => Promise<Task[]>} TaskSamplingFn */

export class TaskingService {
/**
* Register a task sampler for a specific subnet
* @param {string} subnet - The subnet identifier
* @param {TaskSamplingFn} sampleFn - Function that generates tasks for a subnet
*/
registerTaskSampler (subnet, sampleFn) {
console.warn('Registering task sampler is not implemented.')
}

/**
* Generate tasks for all registered subnets for a specific round
* @param {number} roundId
*/
async generateTasksForRound (roundId) {
// TODO: Implement the logic to generate tasks for all registered subnets
console.warn('Tasking service is not implemented.')
}
}
6 changes: 6 additions & 0 deletions migrations/003.do.checker-rounds.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE IF NOT EXISTS checker_rounds (
id BIGSERIAL PRIMARY KEY,
start_time TIMESTAMPTZ NOT NULL,
end_time TIMESTAMPTZ NOT NULL,
active BOOLEAN NOT NULL DEFAULT FALSE
);
104 changes: 104 additions & 0 deletions test/round-service.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import assert from 'assert'
import { after, before, beforeEach, describe, it } from 'node:test'
import { createPgPool } from '../lib/pool.js'
import { migrateWithPgClient } from '../lib/migrate.js'
import { DATABASE_URL } from '../lib/config.js'
import { RoundService } from '../lib/round-service.js'
import { TaskingService } from '../lib/tasking-service.js'
import { withRound } from './test-helpers.js'

const DEFAULT_CONFIG = {
roundDurationMs: 1000,
checkRoundExpirationIntervalMs: 200
}

describe('RoundService', () => {
/** @type {import('pg').Pool} */
let pgPool
/** @type {TaskingService} */
let taskingService

before(async () => {
pgPool = await createPgPool(DATABASE_URL)
await migrateWithPgClient(pgPool)
taskingService = new TaskingService()
})

after(async () => {
await pgPool.end()
})

beforeEach(async () => {
// Reset the database state before each test
await pgPool.query('DELETE FROM checker_rounds')
await pgPool.query('ALTER SEQUENCE checker_rounds_id_seq RESTART WITH 1')
})

describe('rounds', () => {
it('should create a new round if no active round exists', async () => {
const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG)

await roundService.start()
roundService.stop()

const { rows: rounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true')
assert.strictEqual(rounds.length, 1)
assert.ok(new Date(rounds[0].end_time) > new Date())
})

it('should resume an active round if one exists', async () => {
await withRound({
pgPool,
roundDurationMs: DEFAULT_CONFIG.roundDurationMs,
active: true
})

const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG)

await roundService.start()
roundService.stop()

const { rows: rounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true')
assert.strictEqual(rounds.length, 1)
})

it('should stop the round service and prevent further round checks', async () => {
const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG)

await roundService.start()
roundService.stop()

const { rows: rounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true')
assert.strictEqual(rounds.length, 1)

// Wait for the check interval to pass and ensure no new rounds are created
await new Promise(resolve => setTimeout(resolve, DEFAULT_CONFIG.checkRoundExpirationIntervalMs + 1000))

const { rows: newRounds } = await pgPool.query('SELECT * FROM checker_rounds')
assert.strictEqual(newRounds.length, 1)
})
})

describe('round transitions', () => {
it('should deactivate the old round and create a new one when the current round ends', async () => {
await withRound({
pgPool,
roundDurationMs: 1000, // 1 second duration
active: true
})

const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG)

await roundService.start()
// Wait for the current round to end
await new Promise(resolve => setTimeout(resolve, 2000))

roundService.stop()

const { rows: activeRounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true')
const { rows: allRounds } = await pgPool.query('SELECT * FROM checker_rounds')
assert.strictEqual(activeRounds.length, 1)
assert.strictEqual(allRounds.length, 2)
})
})
})
20 changes: 20 additions & 0 deletions test/test-helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,23 @@ export const postMeasurement = (baseUrl, subnet, measurement) => {
body: JSON.stringify(measurement)
})
}

/**
* @param {object} args
* @param {import('../lib/typings.js').PgPool} args.pgPool
* @param {number} args.roundDurationMs
* @param {boolean} [args.active=false]
*/
export const withRound = async ({ pgPool, roundDurationMs, active = false }) => {
const { rows } = await pgPool.query(`
INSERT INTO checker_rounds (start_time, end_time, active)
VALUES (
NOW(),
NOW() + ($1 || ' milliseconds')::INTERVAL,
$2
)
RETURNING *
`, [roundDurationMs, active])

return rows[0]
}