Skip to content

💥 Comply with sharedb spec #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
name: Test

on:
push:
branches:
- main
- setup-ci # TODO: Remove
pull_request:
branches:
- main

jobs:
test:
name: Node.js ${{ matrix.node }} + PostgreSQL ${{ matrix.postgres }}
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
node:
- 16
- 18
- 20
postgres:
- 13
- 14
- 15
- 16
services:
postgres:
image: postgres:${{ matrix.postgres }}
env:
POSTGRES_HOST_AUTH_METHOD: trust
POSTGRES_DB: postgres
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
timeout-minutes: 10
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: ${{ matrix.node }}
- name: Install
run: npm install
- name: Test
run: npm test
env:
PGUSER: postgres
PGPASSWORD: postgres
PGDATABASE: postgres
5 changes: 5 additions & 0 deletions .mocharc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module.exports = {
timeout: 5_000,
file: './test/setup.js',
spec: '**/*.spec.js',
};
179 changes: 74 additions & 105 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
var DB = require('sharedb').DB;
var pg = require('pg');

const PG_UNIQUE_VIOLATION = '23505';

// Postgres-backed ShareDB database

function PostgresDB(options) {
Expand All @@ -9,24 +11,32 @@ function PostgresDB(options) {

this.closed = false;

this.pg_config = options;
this.pool = new pg.Pool(options);
this._pool = new pg.Pool(options);
};
module.exports = PostgresDB;

PostgresDB.prototype = Object.create(DB.prototype);

PostgresDB.prototype.close = function(callback) {
this.closed = true;
this.pool.end();

if (callback) callback();
PostgresDB.prototype.close = async function(callback) {
let error;
try {
if (!this.closed) {
this.closed = true;
await this._pool.end();
}
} catch (err) {
error = err;
}

// FIXME: Don't swallow errors. Emit 'error' event?
if (callback) callback(error);
};


// Persists an op and snapshot if it is for the next version. Calls back with
// callback(err, succeeded)
PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, callback) {
PostgresDB.prototype.commit = async function(collection, id, op, snapshot, options, callback) {
try {
/*
* op: CreateOp {
* src: '24545654654646',
Expand All @@ -37,34 +47,28 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca
* }
* snapshot: PostgresSnapshot
*/
this.pool.connect((err, client, done) => {
if (err) {
done(client);
callback(err);
return;
}
/*
* This query uses common table expression to upsert the snapshot table
* This query uses common table expression to upsert the snapshot table
* (iff the new version is exactly 1 more than the latest table or if
* the document id does not exists)
*
* It will then insert into the ops table if it is exactly 1 more than the
* It will then insert into the ops table if it is exactly 1 more than the
* latest table or it the first operation and iff the previous insert into
* the snapshot table is successful.
*
* This result of this query the version of the newly inserted operation
* If either the ops or the snapshot insert fails then 0 rows are returned
*
* If 0 zeros are return then the callback must return false
* If 0 zeros are return then the callback must return false
*
* Casting is required as postgres thinks that collection and doc_id are
* not varchar
*/
* not varchar
*/
const query = {
name: 'sdb-commit-op-and-snap',
text: `WITH snapshot_id AS (
INSERT INTO snapshots (collection, doc_id, doc_type, version, data)
SELECT $1::varchar collection, $2::varchar doc_id, $4 doc_type, $3 v, $5 d
INSERT INTO snapshots (collection, doc_id, version, doc_type, data, metadata)
SELECT $1::varchar collection, $2::varchar doc_id, $3 v, $4 doc_type, $5 d, $6 m
WHERE $3 = (
SELECT version+1 v
FROM snapshots
Expand All @@ -76,11 +80,11 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca
WHERE collection = $1 AND doc_id = $2
FOR UPDATE
)
ON CONFLICT (collection, doc_id) DO UPDATE SET version = $3, data = $5, doc_type = $4
ON CONFLICT (collection, doc_id) DO UPDATE SET version = $3, data = $5, doc_type = $4, metadata = $5
RETURNING version
)
INSERT INTO ops (collection, doc_id, version, operation)
SELECT $1::varchar collection, $2::varchar doc_id, $3 v, $6 operation
SELECT $1::varchar collection, $2::varchar doc_id, $3 v, $7 operation
WHERE (
$3 = (
SELECT max(version)+1
Expand All @@ -93,66 +97,47 @@ WHERE (
)
) AND EXISTS (SELECT 1 FROM snapshot_id)
RETURNING version`,
values: [collection,id,snapshot.v, snapshot.type, snapshot.data,op]
values: [collection, id, snapshot.v, snapshot.type, JSON.stringify(snapshot.data), JSON.stringify(snapshot.m), JSON.stringify(op)]
}
client.query(query, (err, res) => {
if (err) {
callback(err)
} else if(res.rows.length === 0) {
done(client);
callback(null,false)
}
else {
done(client);
callback(null,true)
}
})

})
const result = await this._pool.query(query);
const success = result.rowCount > 0;
callback(null, success);
} catch (error) {
// Return non-success instead of duplicate key error, since this is
// expected to occur during simultaneous creates on the same id
if (error.code === PG_UNIQUE_VIOLATION) callback(null, false);
else callback(error);
}
};

// Get the named document from the database. The callback is called with (err,
// snapshot). A snapshot with a version of zero is returned if the docuemnt
// has never been created in the database.
PostgresDB.prototype.getSnapshot = function(collection, id, fields, options, callback) {
this.pool.connect(function(err, client, done) {
if (err) {
done(client);
callback(err);
return;
}
client.query(
'SELECT version, data, doc_type FROM snapshots WHERE collection = $1 AND doc_id = $2 LIMIT 1',
PostgresDB.prototype.getSnapshot = async function(collection, id, fields, options, callback) {
fields ||= {};
options ||= {};
const wantsMetadata = fields.$submit || options.metadata;
try {
const result = await this._pool.query(
'SELECT version, data, doc_type, metadata FROM snapshots WHERE collection = $1 AND doc_id = $2 LIMIT 1',
[collection, id],
function(err, res) {
done();
if (err) {
callback(err);
return;
}
if (res.rows.length) {
var row = res.rows[0]
var snapshot = new PostgresSnapshot(
id,
row.version,
row.doc_type,
row.data,
undefined // TODO: metadata
)
callback(null, snapshot);
} else {
var snapshot = new PostgresSnapshot(
id,
0,
null,
undefined,
undefined
)
callback(null, snapshot);
}
}
)
})
);

var row = result.rows[0]
const snapshot = {
id,
v: row?.version || 0,
type: row?.doc_type || null,
data: row?.data || undefined,
m: wantsMetadata ?
// Postgres returns null but ShareDB expects undefined
(row?.metadata || undefined) :
null,
};
callback(null, snapshot);
} catch (error) {
callback(error);
}
};

// Get operations between [from, to) noninclusively. (Ie, the range should
Expand All @@ -164,37 +149,21 @@ PostgresDB.prototype.getSnapshot = function(collection, id, fields, options, cal
// The version will be inferred from the parameters if it is missing.
//
// Callback should be called as callback(error, [list of ops]);
PostgresDB.prototype.getOps = function(collection, id, from, to, options, callback) {
this.pool.connect(function(err, client, done) {
if (err) {
done(client);
callback(err);
return;
}

PostgresDB.prototype.getOps = async function(collection, id, from, to, options, callback) {
from ||= 0;
options ||= {};
const wantsMetadata = options.metadata;
try {
var cmd = 'SELECT version, operation FROM ops WHERE collection = $1 AND doc_id = $2 AND version > $3 ';
var params = [collection, id, from];
if(to || to == 0) { cmd += ' AND version <= $4'; params.push(to)}
cmd += ' order by version';
client.query( cmd, params,
function(err, res) {
done();
if (err) {
callback(err);
return;
}
callback(null, res.rows.map(function(row) {
return row.operation;
}));
}
)
})
const result = await this._pool.query(cmd, params);
callback(null, result.rows.map(({operation}) => {
if (!wantsMetadata) delete operation.m;
return operation;
}));
} catch (error) {
callback(error);
}
};

function PostgresSnapshot(id, version, type, data, meta) {
this.id = id;
this.v = version;
this.type = type;
this.data = data;
this.m = meta;
}
37 changes: 37 additions & 0 deletions index.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
const PostgresDB = require('.');
const {Pool} = require('pg');
const fs = require('node:fs');

const DB_NAME = 'sharedbtest';

function create(callback) {
var db = new PostgresDB({database: DB_NAME});
callback(null, db);
};

describe('PostgresDB', function() {
let pool;
let client;

beforeEach(async () => {
pool = new Pool({database: 'postgres'});
client = await pool.connect();
await client.query(`DROP DATABASE IF EXISTS ${DB_NAME}`);
await client.query(`CREATE DATABASE ${DB_NAME}`);

const testPool = new Pool({database: DB_NAME});
const testClient = await testPool.connect();
const structure = fs.readFileSync('./structure.sql', 'utf8');
await testClient.query(structure);
await testClient.release(true);
await testPool.end();
});

afterEach(async function() {
await client.query(`DROP DATABASE IF EXISTS ${DB_NAME}`);
await client.release(true);
await pool.end();
});

require('sharedb/test/db')({create: create});
});
12 changes: 9 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"description": "PostgreSQL adapter for ShareDB. forked from share/sharedb-postgres",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
"test": "mocha"
},
"author": "Jeremy Apthorp <[email protected]>",
"license": "MIT",
Expand All @@ -19,8 +19,14 @@
"url": "https://github.com/share/sharedb-postgres"
},
"dependencies": {
"pg": "^8.5.1",
"pg-pool": "^3.2.1",
"pg": "^8.12.0",
"sharedb": "^1.6.0 || ^2.0.0 || ^3.0.0 || ^4.0.0 || ^5.0.0"
},
"devDependencies": {
"chai": "^4.4.1",
"mocha": "^10.4.0",
"ot-json1": "^1.0.2",
"rich-text": "^4.1.0",
"sinon": "^18.0.0"
}
}
Loading