diff --git a/src/core/components/index.js b/src/core/components/index.js index 44d922712a..2bd1c31092 100644 --- a/src/core/components/index.js +++ b/src/core/components/index.js @@ -3,6 +3,11 @@ exports.add = require('./add') exports.config = require('./config') exports.init = require('./init') +exports.repo = { + gc: require('./repo/gc'), + stat: require('./repo/stat'), + version: require('./repo/version') +} exports.start = require('./start') exports.stop = require('./stop') diff --git a/src/core/components/init.js b/src/core/components/init.js index 089d6148dc..90a03542c7 100644 --- a/src/core/components/init.js +++ b/src/core/components/init.js @@ -301,6 +301,16 @@ function createApi ({ add, config: Commands.config({ repo }), init: () => { throw new AlreadyInitializedError() }, + repo: { + // TODO: gc should be available after init + // `resolve` (passed to `refs` API) which is a dependency for `gc` API + // needs to be altered to allow `name` API dependency to be optional, so + // that `resolve` can also be available when not started, and so `gc` can + // be run when not started. + // gc: Commands.repo.gc({ gcLock, pin, pinManager, refs, repo }), + stat: Commands.repo.stat({ repo }), + version: Commands.repo.version({ repo }) + }, start } diff --git a/src/core/components/pin/gc.js b/src/core/components/pin/gc.js deleted file mode 100644 index a974a85de5..0000000000 --- a/src/core/components/pin/gc.js +++ /dev/null @@ -1,153 +0,0 @@ -'use strict' - -const CID = require('cids') -const base32 = require('base32.js') -const callbackify = require('callbackify') -const { cidToString } = require('../../../utils/cid') -const log = require('debug')('ipfs:gc') -const { default: Queue } = require('p-queue') -// TODO: Use exported key from root when upgraded to ipfs-mfs@>=13 -// https://github.com/ipfs/js-ipfs-mfs/pull/58 -const { MFS_ROOT_KEY } = require('ipfs-mfs/src/core/utils/constants') - -const { Errors } = require('interface-datastore') -const ERR_NOT_FOUND = Errors.notFoundError().code - -// Limit on the number of parallel block remove operations -const BLOCK_RM_CONCURRENCY = 256 - -// Perform mark and sweep garbage collection -module.exports = function gc (self) { - return callbackify(async () => { - const start = Date.now() - log('Creating set of marked blocks') - - const release = await self._gcLock.writeLock() - - try { - const [ - blockKeys, markedSet - ] = await Promise.all([ - // Get all blocks keys from the blockstore - self._repo.blocks.query({ keysOnly: true }), - - // Mark all blocks that are being used - createMarkedSet(self) - ]) - - // Delete blocks that are not being used - const res = await deleteUnmarkedBlocks(self, markedSet, blockKeys) - - log(`Complete (${Date.now() - start}ms)`) - - return res - } finally { - release() - } - }) -} - -// Get Set of CIDs of blocks to keep -async function createMarkedSet (ipfs) { - const output = new Set() - - const addPins = pins => { - log(`Found ${pins.length} pinned blocks`) - - pins.forEach(pin => { - output.add(cidToString(new CID(pin), { base: 'base32' })) - }) - } - - await Promise.all([ - // All pins, direct and indirect - ipfs.pin.ls() - .then(pins => pins.map(pin => pin.hash)) - .then(addPins), - - // Blocks used internally by the pinner - ipfs.pin.pinManager.getInternalBlocks() - .then(addPins), - - // The MFS root and all its descendants - ipfs._repo.root.get(MFS_ROOT_KEY) - .then(mh => getDescendants(ipfs, new CID(mh))) - .then(addPins) - .catch(err => { - if (err.code === ERR_NOT_FOUND) { - log('No blocks in MFS') - return [] - } - - throw err - }) - ]) - - return output -} - -// Recursively get descendants of the given CID -async function getDescendants (ipfs, cid) { - const refs = await ipfs.refs(cid, { recursive: true }) - const cids = [cid, ...refs.map(r => new CID(r.ref))] - log(`Found ${cids.length} MFS blocks`) - // log(' ' + cids.join('\n ')) - - return cids -} - -// Delete all blocks that are not marked as in use -async function deleteUnmarkedBlocks (ipfs, markedSet, blockKeys) { - // Iterate through all blocks and find those that are not in the marked set - // The blockKeys variable has the form [ { key: Key() }, { key: Key() }, ... ] - const unreferenced = [] - const result = [] - - const queue = new Queue({ - concurrency: BLOCK_RM_CONCURRENCY - }) - - for await (const { key: k } of blockKeys) { - try { - const cid = dsKeyToCid(k) - const b32 = cid.toV1().toString('base32') - if (!markedSet.has(b32)) { - unreferenced.push(cid) - - queue.add(async () => { - const res = { - cid - } - - try { - await ipfs._repo.blocks.delete(cid) - } catch (err) { - res.err = new Error(`Could not delete block with CID ${cid}: ${err.message}`) - } - - result.push(res) - }) - } - } catch (err) { - const msg = `Could not convert block with key '${k}' to CID` - log(msg, err) - result.push({ err: new Error(msg + `: ${err.message}`) }) - } - } - - await queue.onIdle() - - log(`Marked set has ${markedSet.size} unique blocks. Blockstore has ${blockKeys.length} blocks. ` + - `Deleted ${unreferenced.length} blocks.`) - - return result -} - -// TODO: Use exported utility when upgrade to ipfs-repo@>=0.27.1 -// https://github.com/ipfs/js-ipfs-repo/pull/206 -function dsKeyToCid (key) { - // Block key is of the form / - const decoder = new base32.Decoder() - const buff = decoder.write(key.toString().slice(1)).finalize() - return new CID(Buffer.from(buff)) -} diff --git a/src/core/components/repo/gc.js b/src/core/components/repo/gc.js new file mode 100644 index 0000000000..f5337cec05 --- /dev/null +++ b/src/core/components/repo/gc.js @@ -0,0 +1,110 @@ +'use strict' + +const CID = require('cids') +const { cidToString } = require('../../../utils/cid') +const log = require('debug')('ipfs:repo:gc') +const { MFS_ROOT_KEY } = require('ipfs-mfs') +const Repo = require('ipfs-repo') +const { Errors } = require('interface-datastore') +const ERR_NOT_FOUND = Errors.notFoundError().code +const { parallelMerge, transform, map } = require('streaming-iterables') + +// Limit on the number of parallel block remove operations +const BLOCK_RM_CONCURRENCY = 256 + +// Perform mark and sweep garbage collection +module.exports = ({ gcLock, pin, pinManager, refs, repo }) => { + return async function * gc () { + const start = Date.now() + log('Creating set of marked blocks') + + const release = await gcLock.writeLock() + + try { + // Mark all blocks that are being used + const markedSet = await createMarkedSet({ pin, pinManager, repo }) + // Get all blocks keys from the blockstore + const blockKeys = repo.blocks.query({ keysOnly: true }) + + // Delete blocks that are not being used + yield * deleteUnmarkedBlocks({ repo, refs }, markedSet, blockKeys) + + log(`Complete (${Date.now() - start}ms)`) + } finally { + release() + } + } +} + +// Get Set of CIDs of blocks to keep +async function createMarkedSet ({ pin, pinManager, refs, repo }) { + const pinsSource = map(({ hash }) => hash, pin.ls()) + + const pinInternalsSource = async function * () { + const cids = await pinManager.getInternalBlocks() + yield * cids + } + + const mfsSource = async function * () { + const mh = await repo.root.get(MFS_ROOT_KEY) + const rootCid = new CID(mh) + yield rootCid + try { + for await (const { ref } of refs(rootCid, { recursive: true })) { + yield new CID(ref) + } + } catch (err) { + if (err.code === ERR_NOT_FOUND) { + log('No blocks in MFS') + return + } + throw err + } + } + + const output = new Set() + for await (const cid of parallelMerge(pinsSource, pinInternalsSource, mfsSource)) { + output.add(cidToString(cid, { base: 'base32' })) + } + return output +} + +// Delete all blocks that are not marked as in use +async function * deleteUnmarkedBlocks ({ repo, refs }, markedSet, blockKeys) { + // Iterate through all blocks and find those that are not in the marked set + // blockKeys yields { key: Key() } + let blocksCount = 0 + let removedBlocksCount = 0 + + const removeBlock = async ({ key: k }) => { + blocksCount++ + + try { + const cid = Repo.utils.blockstore.keyToCid(k) + const b32 = cid.toV1().toString('base32') + if (markedSet.has(b32)) return null + const res = { cid } + + try { + await repo.blocks.delete(cid) + removedBlocksCount++ + } catch (err) { + res.err = new Error(`Could not delete block with CID ${cid}: ${err.message}`) + } + + return res + } catch (err) { + const msg = `Could not convert block with key '${k}' to CID` + log(msg, err) + return { err: new Error(msg + `: ${err.message}`) } + } + } + + for await (const res of transform(BLOCK_RM_CONCURRENCY, removeBlock, blockKeys)) { + // filter nulls (blocks that were retained) + if (res) yield res + } + + log(`Marked set has ${markedSet.size} unique blocks. Blockstore has ${blocksCount} blocks. ` + + `Deleted ${removedBlocksCount} blocks.`) +} diff --git a/src/core/components/repo/stat.js b/src/core/components/repo/stat.js new file mode 100644 index 0000000000..d6310c8746 --- /dev/null +++ b/src/core/components/repo/stat.js @@ -0,0 +1,15 @@ +'use strict' + +module.exports = ({ repo }) => { + return async function stat () { + const stats = await repo.stat() + + return { + numObjects: stats.numObjects, + repoSize: stats.repoSize, + repoPath: stats.repoPath, + version: stats.version.toString(), + storageMax: stats.storageMax + } + } +} diff --git a/src/core/components/repo/version.js b/src/core/components/repo/version.js new file mode 100644 index 0000000000..9af7b07735 --- /dev/null +++ b/src/core/components/repo/version.js @@ -0,0 +1,33 @@ +'use strict' + +const { repoVersion } = require('ipfs-repo') + +module.exports = ({ repo }) => { + /** + * If the repo has been initialized, report the current version. + * Otherwise report the version that would be initialized. + * + * @returns {number} + */ + return async function version () { + try { + await repo._checkInitialized() + } catch (err) { + // TODO: (dryajov) This is really hacky, there must be a better way + const match = [ + /Key not found in database \[\/version\]/, + /ENOENT/, + /repo is not initialized yet/ + ].some((m) => { + return m.test(err.message) + }) + if (match) { + // this repo has not been initialized + return repoVersion + } + throw err + } + + return repo.version.get() + } +} diff --git a/src/core/components/start.js b/src/core/components/start.js index f9f41c7458..3407781ec5 100644 --- a/src/core/components/start.js +++ b/src/core/components/start.js @@ -133,6 +133,14 @@ function createApi ({ add, config: Commands.config({ repo }), init: () => { throw new AlreadyInitializedError() }, + repo: { + // TODO: this PR depends on `refs` refactor and the `pins` refactor + // https://github.com/ipfs/js-ipfs/pull/2658 + // https://github.com/ipfs/js-ipfs/pull/2660 + // gc: Commands.repo.gc({ gcLock, pin, pinManager, refs, repo }), + stat: Commands.repo.stat({ repo }), + version: Commands.repo.version({ repo }) + }, start: () => apiManager.api, stop } diff --git a/src/core/components/stop.js b/src/core/components/stop.js index 4e2a9bb036..da240d555b 100644 --- a/src/core/components/stop.js +++ b/src/core/components/stop.js @@ -100,6 +100,16 @@ function createApi ({ add, config: Commands.config({ repo }), init: () => { throw new AlreadyInitializedError() }, + repo: { + // TODO: gc should be available when stopped + // `resolve` (passed to `refs` API) which is a dependency for `gc` API + // needs to be altered to allow `name` API dependency to be optional, so + // that `resolve` can also be available when not started, and so `gc` can + // be run when not started. + // gc: Commands.repo.gc({ gcLock, pin, pinManager, refs, repo }), + stat: Commands.repo.stat({ repo }), + version: Commands.repo.version({ repo }) + }, start, stop: () => apiManager.api }