Skip to content

Fixes to op version tracking #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{
"env": {
"es6": true,
"node": true
},
"extends": "eslint:recommended",
"rules": {
"no-console": "off",
"indent": [
"error",
2,
{
"FunctionDeclaration": {
"parameters": "first"
},
"FunctionExpression": {
"parameters": "first"
},
"CallExpression": {
"arguments": 1
},
"SwitchCase": 1
}
],
"quotes": [
"error",
"double",
{
"allowTemplateLiterals": true
}
],
"semi": [
"error",
"always"
]
},
"parserOptions": {
"ecmaVersion": 2017
}
}
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
node_modules

.idea
49 changes: 48 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,59 @@ Doesn't support queries (yet?).
Moderately experimental. (This drives [Synaptograph](https://www.synaptograph.com)'s backend, and [@nornagon](https://github.com/nornagon) hasn't noticed any issues so far.)


## Requirements

Due to the fix to resolve [high concurency issues](https://github.com/share/sharedb-postgres/issues/1) Postgres 9.5+ is now required.

## Migrating older versions

Older versions of this adaptor used the data type json. You will need to alter the data type prior to using if you are upgrading.

```PLpgSQL
ALTER TABLE ops
ALTER COLUMN operation
SET DATA TYPE jsonb
USING operation::jsonb;

ALTER TABLE snapshots
ALTER COLUMN data
SET DATA TYPE jsonb
USING data::jsonb;
```

In addition, the `snapshots` table might have a `NOT NULL` constraint on the `doc_type` and `data` columns that will need to be dropped, since these are set to null when a ShareDB document is deleted via its `.del()` method.

```PLpgSQL
ALTER TABLE snapshots
ALTER COLUMN doc_type
DROP NOT NULL;

ALTER TABLE snapshots
ALTER COLUMN data
DROP NOT NULL;
```

## Usage

`sharedb-postgres` wraps native [node-postgres](https://github.com/brianc/node-postgres), and it supports the same configuration options.

To instantiate a sharedb-postgres wrapper, invoke the module and pass in your
PostgreSQL configuration as an argument. For example:
PostgreSQL configuration as an argument or use environmental arguments.

For example using environmental arugments:

```js
var db = require('sharedb-postgres')();
var backend = require('sharedb')({db: db})
```

Then executing via the command line

```
PGUSER=dbuser PGPASSWORD=secretpassword PGHOST=database.server.com PGDATABASE=mydb PGPORT=5433 npm start
```

Example using an object

```js
var db = require('sharedb-postgres')({host: 'localhost', database: 'mydb'});
Expand Down
198 changes: 100 additions & 98 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
var DB = require('sharedb').DB;
var pg = require('pg');
var DB = require("sharedb").DB;
var pg = require("pg");

// Postgres-backed ShareDB database

Expand All @@ -15,22 +15,17 @@ module.exports = PostgresDB;

PostgresDB.prototype = Object.create(DB.prototype);

PostgresDB.prototype.close = function(callback) {
PostgresDB.prototype.close = function (callback) {
this.closed = true;
this.pool.end();

if (callback) callback();
};

function rollback(client, done) {
client.query('ROLLBACK', function(err) {
return done(err);
})
}

// Persists an op and snapshot if it is for the next version. Calls back with
// callback(err, succeeded)
PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, callback) {
PostgresDB.prototype.commit = function (collection, id, op, snapshot, options, callback) {
/*
* op: CreateOp {
* src: '24545654654646',
Expand All @@ -41,114 +36,118 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca
* }
* snapshot: PostgresSnapshot
*/
this.pool.connect(function(err, client, done) {
this.pool.connect((err, client, done) => {
if (err) {
done(client);
callback(err);
return;
}
function commit() {
client.query('COMMIT', function(err) {
done(err);
if (err) {
callback(err);
} else {
callback(null, true);
}
})
}
client.query(
'SELECT max(version) AS max_version FROM ops WHERE collection = $1 AND doc_id = $2',
[collection, id],
function(err, res) {
var max_version = res.rows[0].max_version;
if (max_version == null)
max_version = 0;
if (snapshot.v !== max_version + 1) {
return callback(null, false);
}
client.query('BEGIN', function(err) {
client.query(
'INSERT INTO ops (collection, doc_id, version, operation) VALUES ($1, $2, $3, $4)',
[collection, id, snapshot.v, op],
function(err, res) {
if (err) {
// TODO: if err is "constraint violation", callback(null, false) instead
rollback(client, done);
callback(err);
return;
}
if (snapshot.v === 1) {
client.query(
'INSERT INTO snapshots (collection, doc_id, doc_type, version, data) VALUES ($1, $2, $3, $4, $5)',
[collection, id, snapshot.type, snapshot.v, snapshot.data],
function(err, res) {
// TODO:
// if the insert was successful and did insert, callback(null, true)
// if the insert was successful and did not insert, callback(null, false)
// if there was an error, rollback and callback(error)
if (err) {
rollback(client, done);
callback(err);
return;
}
commit();
}
)
} else {
client.query(
'UPDATE snapshots SET doc_type = $3, version = $4, data = $5 WHERE collection = $1 AND doc_id = $2 AND version = ($4 - 1)',
[collection, id, snapshot.type, snapshot.v, snapshot.data],
function(err, res) {
// TODO:
// if any rows were updated, success
// if 0 rows were updated, rollback and not success
// if error, rollback and not success
if (err) {
rollback(client, done);
callback(err);
return;
}
commit();
}
)
}
}
)
})

/*
* This query uses common table expression to upsert the snapshot table
* (iff the new version is exactly 1 more than the latest table or if
* the document id does not exists)
*
* It will then insert into the ops table if it is exactly 1 more than the
* latest table or it the first operation and iff the previous insert into
* the snapshot table is successful.
*
* This result of this query the version of the newly inserted operation
* If either the ops or the snapshot insert fails then 0 rows are returned
*
* If 0 zeros are return then the callback must return false
*
* Casting is required as postgres thinks that collection and doc_id are
* not varchar
*/
// ZW: We also should have the first op version be 0 (the actual value
// of op.v) instead of 1, in order to match the reference MemoryDB
// implementation. Catching up outdated clients who reconnect may be
// buggy otherwise.
const query = {
name: "sdb-commit-op-and-snap",
text: `WITH snapshot_id AS (
INSERT INTO snapshots (collection, doc_id, doc_type, version, data)
SELECT $1::varchar collection, $2::varchar doc_id,
$3 snap_type, $4 snap_v, $5 snap_data
WHERE $4 = (
SELECT version+1 snap_v
FROM snapshots
WHERE collection = $1 AND doc_id = $2
FOR UPDATE
) OR NOT EXISTS (
SELECT 1
FROM snapshots
WHERE collection = $1 AND doc_id = $2
FOR UPDATE
)
ON CONFLICT (collection, doc_id) DO
UPDATE SET doc_type = $3, version = $4, data = $5
RETURNING version
)
INSERT INTO ops (collection, doc_id, version, operation)
SELECT $1::varchar collection, $2::varchar doc_id,
$6 op_v, $7 op
WHERE (
$6 = (
SELECT max(version)+1
FROM ops
WHERE collection = $1 AND doc_id = $2
) OR NOT EXISTS (
SELECT 1
FROM ops
WHERE collection = $1 AND doc_id = $2
)
) AND EXISTS (SELECT 1 FROM snapshot_id)
RETURNING version`,
values: [
collection, id, snapshot.type, snapshot.v, snapshot.data, op.v, op
]
};
client.query(query, (err, res) => {
if (err) {
callback(err);
} else if (res.rows.length === 0) {
done(client);
callback(null, false);
}
)
})
else {
done(client);
callback(null, true);
}
});

});
};

// Get the named document from the database. The callback is called with (err,
// snapshot). A snapshot with a version of zero is returned if the docuemnt
// has never been created in the database.
PostgresDB.prototype.getSnapshot = function(collection, id, fields, options, callback) {
this.pool.connect(function(err, client, done) {
PostgresDB.prototype.getSnapshot = function (collection, id, fields, options, callback) {
this.pool.connect(function (err, client, done) {
if (err) {
done(client);
callback(err);
return;
}
client.query(
'SELECT version, data, doc_type FROM snapshots WHERE collection = $1 AND doc_id = $2 LIMIT 1',
"SELECT version, data, doc_type FROM snapshots WHERE collection = $1 AND doc_id = $2 LIMIT 1",
[collection, id],
function(err, res) {
function (err, res) {
done();
if (err) {
callback(err);
return;
}
if (res.rows.length) {
var row = res.rows[0]
var row = res.rows[0];
var snapshot = new PostgresSnapshot(
id,
row.version,
row.doc_type,
row.data,
undefined // TODO: metadata
)
);
callback(null, snapshot);
} else {
var snapshot = new PostgresSnapshot(
Expand All @@ -157,12 +156,12 @@ PostgresDB.prototype.getSnapshot = function(collection, id, fields, options, cal
null,
undefined,
undefined
)
);
callback(null, snapshot);
}
}
)
})
);
});
};

// Get operations between [from, to) noninclusively. (Ie, the range should
Expand All @@ -174,28 +173,31 @@ PostgresDB.prototype.getSnapshot = function(collection, id, fields, options, cal
// The version will be inferred from the parameters if it is missing.
//
// Callback should be called as callback(error, [list of ops]);
PostgresDB.prototype.getOps = function(collection, id, from, to, options, callback) {
this.pool.connect(function(err, client, done) {
PostgresDB.prototype.getOps = function (collection, id, from, to, options, callback) {
this.pool.connect(function (err, client, done) {
if (err) {
done(client);
callback(err);
return;
}

// ZW: Add explicit row ordering here
client.query(
'SELECT version, operation FROM ops WHERE collection = $1 AND doc_id = $2 AND version >= $3 AND version < $4',
"SELECT version, operation FROM ops WHERE collection = $1 AND doc_id =" +
" $2 AND version >= $3 AND version < $4 ORDER BY version ASC",
[collection, id, from, to],
function(err, res) {
function (err, res) {
done();
if (err) {
callback(err);
return;
}
callback(null, res.rows.map(function(row) {
callback(null, res.rows.map(function (row) {
return row.operation;
}));
}
)
})
);
});
};

function PostgresSnapshot(id, version, type, data, meta) {
Expand Down
Loading