Skip to content
This repository was archived by the owner on Dec 28, 2022. It is now read-only.

Commit 731e41b

Browse files
authored
Broadcast [0, contig) range on replicator open and add want messages (#114)
* Broadcast `[0, contig)` range on replicator open * Add failing test case * Test ranges as well * First draft of "want" flow * Test discrete ranges * Handle discrete ranges * Reduce test duplication * Test seeks * GC remaining wants when peers are done processing * Only broadcast `contig` if `> 0` * Fix tests * Simplify wants * Send bitfield pages on want * Increase default segment size * Broadcast non-sparse length to peers * Set flag * Brittle workaround
1 parent 84111d8 commit 731e41b

File tree

4 files changed

+136
-37
lines changed

4 files changed

+136
-37
lines changed

index.js

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,18 @@ module.exports = class Hypercore extends EventEmitter {
527527
s.emit('append')
528528
}
529529
}
530+
531+
const contig = this.core.header.contiguousLength
532+
533+
// When the contig length catches up, broadcast the non-sparse length to peers
534+
if (appendedNonSparse && contig === this.core.tree.length) {
535+
for (const peer of this.peers) {
536+
if (peer.broadcastedNonSparse) continue
537+
538+
peer.broadcastRange(0, contig)
539+
peer.broadcastedNonSparse = true
540+
}
541+
}
530542
}
531543

532544
if (bitfield) {
@@ -662,29 +674,25 @@ module.exports = class Hypercore extends EventEmitter {
662674
}
663675

664676
async _get (index, opts) {
665-
let req
677+
let block
666678

667679
if (this.core.bitfield.get(index)) {
668-
req = this.core.blocks.get(index)
680+
block = this.core.blocks.get(index)
669681

670-
if (this.cache) this.cache.set(index, req)
682+
if (this.cache) this.cache.set(index, block)
671683
} else {
672684
if (opts && opts.wait === false) return null
673685
if (opts && opts.onwait) opts.onwait(index, this)
674686
else if (this.onwait) this.onwait(index, this)
675687

676688
const activeRequests = (opts && opts.activeRequests) || this.activeRequests
677689

678-
req = this._cacheOnResolve(
679-
index,
680-
this.replicator
681-
.addBlock(activeRequests, index)
682-
.promise,
683-
this.core.tree.fork
684-
)
690+
const req = this.replicator.addBlock(activeRequests, index)
691+
692+
block = this._cacheOnResolve(index, req.promise, this.core.tree.fork)
685693
}
686694

687-
return req
695+
return block
688696
}
689697

690698
async _cacheOnResolve (index, req, fork) {
@@ -725,7 +733,9 @@ module.exports = class Hypercore extends EventEmitter {
725733

726734
async _download (range) {
727735
if (this.opened === false) await this.opening
736+
728737
const activeRequests = (range && range.activeRequests) || this.activeRequests
738+
729739
return this.replicator.addRange(activeRequests, range)
730740
}
731741

lib/bitfield.js

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,6 @@ module.exports = class Bitfield {
9898
}
9999
}
100100

101-
// Should prob be removed, when/if we re-add compression
102-
page (i) {
103-
const p = this.pages.get(i)
104-
return p ? p.bitfield : new Uint32Array(1024)
105-
}
106-
107101
clear () {
108102
return new Promise((resolve, reject) => {
109103
this.storage.del(0, Infinity, (err) => {

lib/replicator.js

Lines changed: 56 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const m = require('./messages')
77
const caps = require('./caps')
88

99
const DEFAULT_MAX_INFLIGHT = 32
10+
const DEFAULT_SEGMENT_SIZE = 128 * 1024 // 128 KiB
1011

1112
class Attachable {
1213
constructor () {
@@ -286,6 +287,9 @@ class Peer {
286287
this.remoteDownloading = true
287288
this.remoteSynced = false
288289

290+
this.segmentsWanted = new Set()
291+
this.broadcastedNonSparse = false
292+
289293
this.lengthAcked = 0
290294

291295
this.extensions = new Map()
@@ -356,13 +360,13 @@ class Peer {
356360

357361
this.sendSync()
358362

359-
const p = pages(this.core)
363+
const contig = this.core.header.contiguousLength
364+
if (contig > 0) {
365+
this.broadcastRange(0, contig, false)
360366

361-
for (let index = 0; index < p.length; index++) {
362-
this.wireBitfield.send({
363-
start: index * this.core.bitfield.pageSize,
364-
bitfield: p[index]
365-
})
367+
if (contig === this.core.tree.length) {
368+
this.broadcastedNonSparse = true
369+
}
366370
}
367371

368372
this.replicator._ifAvailable--
@@ -552,8 +556,8 @@ class Peer {
552556
this.replicator._onnodata(this, req)
553557
}
554558

555-
onwant () {
556-
// TODO
559+
onwant ({ start, length }) {
560+
this.replicator._onwant(this, start, length)
557561
}
558562

559563
onunwant () {
@@ -672,6 +676,7 @@ class Peer {
672676
return true
673677
}
674678

679+
this._maybeWant(s.seeker.start, len)
675680
return false
676681
}
677682

@@ -683,7 +688,10 @@ class Peer {
683688
_requestBlock (b) {
684689
const { length, fork } = this.core.tree
685690

686-
if (this.remoteBitfield.get(b.index) === false || fork !== this.remoteFork) return false
691+
if (this.remoteBitfield.get(b.index) === false || fork !== this.remoteFork) {
692+
this._maybeWant(b.index)
693+
return false
694+
}
687695

688696
const req = this._makeRequest(b.index >= length)
689697
if (req === null) return false
@@ -741,6 +749,7 @@ class Peer {
741749
return true
742750
}
743751

752+
this._maybeWant(r.start, len)
744753
return false
745754
}
746755

@@ -778,9 +787,25 @@ class Peer {
778787
return true
779788
}
780789

790+
this._maybeWant(f.batch.want.start, len)
781791
return false
782792
}
783793

794+
_maybeWant (start, length = 1) {
795+
let i = Math.floor(start / DEFAULT_SEGMENT_SIZE)
796+
const n = Math.ceil((start + length) / DEFAULT_SEGMENT_SIZE)
797+
798+
for (; i < n; i++) {
799+
if (this.segmentsWanted.has(i)) continue
800+
this.segmentsWanted.add(i)
801+
802+
this.wireWant.send({
803+
start: i * DEFAULT_SEGMENT_SIZE,
804+
length: DEFAULT_SEGMENT_SIZE
805+
})
806+
}
807+
}
808+
784809
async _send (req) {
785810
const fork = this.core.tree.fork
786811

@@ -1297,6 +1322,28 @@ module.exports = class Replicator {
12971322
else this.updatePeer(peer)
12981323
}
12991324

1325+
_onwant (peer, start, length) {
1326+
length = Math.min(length, this.core.tree.length - start)
1327+
1328+
peer.protomux.cork()
1329+
1330+
let i = Math.floor(start / this.core.bitfield.pageSize)
1331+
const n = Math.ceil((start + length) / this.core.bitfield.pageSize)
1332+
1333+
for (; i < n; i++) {
1334+
const p = this.core.bitfield.pages.get(i)
1335+
1336+
if (p) {
1337+
peer.wireBitfield.send({
1338+
start: i * this.core.bitfield.pageSize,
1339+
bitfield: p.bitfield
1340+
})
1341+
}
1342+
}
1343+
1344+
peer.protomux.uncork()
1345+
}
1346+
13001347
async _onreorgdata (peer, req, data) {
13011348
const f = this._addReorg(data.fork, peer)
13021349

@@ -1523,17 +1570,6 @@ module.exports = class Replicator {
15231570
}
15241571
}
15251572

1526-
function pages (core) {
1527-
const res = []
1528-
1529-
for (let i = 0; i < core.tree.length; i += core.bitfield.pageSize) {
1530-
const p = core.bitfield.page(i / core.bitfield.pageSize)
1531-
res.push(p)
1532-
}
1533-
1534-
return res
1535-
}
1536-
15371573
function matchingRequest (req, data) {
15381574
if (data.block !== null && (req.block === null || req.block.index !== data.block.index)) return false
15391575
if (data.hash !== null && (req.hash === null || req.hash.index !== data.hash.index)) return false

test/replicate.js

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -781,3 +781,62 @@ test('non-sparse snapshot during partial replication', async function (t) {
781781
await s.update()
782782
t.is(s.contiguousLength, s.length)
783783
})
784+
785+
test('sparse replication without gossiping', async function (t) {
786+
t.plan(4)
787+
788+
const a = await create()
789+
const b = await create(a.key)
790+
791+
await a.append(['a', 'b', 'c'])
792+
793+
let s
794+
795+
s = replicate(a, b)
796+
await b.download({ start: 0, end: 3 }).downloaded()
797+
await unreplicate(s)
798+
799+
await a.append(['d', 'e', 'f', 'd'])
800+
801+
s = replicate(a, b)
802+
await b.download({ start: 4, end: 7 }).downloaded()
803+
await unreplicate(s)
804+
805+
await t.test('block', async function (t) {
806+
const c = await create(a.key)
807+
808+
s = replicate(b, c)
809+
t.teardown(() => unreplicate(s))
810+
811+
t.alike(await c.get(4), Buffer.from('e'))
812+
})
813+
814+
await t.test('range', async function (t) {
815+
const c = await create(a.key)
816+
817+
s = replicate(b, c)
818+
t.teardown(() => unreplicate(s))
819+
820+
await c.download({ start: 4, end: 6 }).downloaded()
821+
t.pass('resolved')
822+
})
823+
824+
await t.test('discrete range', async function (t) {
825+
const c = await create(a.key)
826+
827+
s = replicate(b, c)
828+
t.teardown(() => unreplicate(s))
829+
830+
await c.download({ blocks: [4, 6] }).downloaded()
831+
t.pass('resolved')
832+
})
833+
834+
await t.test('seek', async function (t) {
835+
const c = await create(a.key)
836+
837+
s = replicate(b, c)
838+
t.teardown(() => unreplicate(s))
839+
840+
t.alike(await c.seek(4), [4, 0])
841+
})
842+
})

0 commit comments

Comments
 (0)