diff --git a/cedar/args.sh b/cedar/args.sh index 6bebda8f..bcd373d9 100644 --- a/cedar/args.sh +++ b/cedar/args.sh @@ -1,5 +1,5 @@ -INSTANCE_NAME="cedar-test-db" +INSTANCE_NAME="cedar-test-db-mb170325" TIER="db-f1-micro" REGION="us-central1" DB_NAME="activitypub" -BUCKET_NAME="cedar-test-data" +BUCKET_NAME="cedar-test-data-mb170325" diff --git a/cedar/data-generation/Dockerfile b/cedar/data-generation/Dockerfile index 3eef44b0..84f9fb9c 100644 --- a/cedar/data-generation/Dockerfile +++ b/cedar/data-generation/Dockerfile @@ -13,5 +13,7 @@ COPY generate-follows.js . COPY generate-follows-worker.js . COPY generate-feeds.js . COPY generate-feeds-worker.js . +COPY generate-follows-notifications.js . +COPY generate-follows-notifications-worker.js . CMD ["/bin/bash", "data-generate.sh"] diff --git a/cedar/data-generation/README.md b/cedar/data-generation/README.md index 13b0d0f4..eb04bba6 100644 --- a/cedar/data-generation/README.md +++ b/cedar/data-generation/README.md @@ -47,3 +47,24 @@ in lots of large csv files), so we have to do the following: We load `users` and `posts` into memory as we need to do a lookup for each of the follows. This works ok for the max dataset we are generating, but could be problematic for larger datasets. + +### Notifications Generation + +To generate notifications for follows, we need to: + +- Find the ID of the account associated with a user +- Find all the accounts that follows the user's account +- Insert into the notifications table each an entry for each of the resolved accounts + +Finding all of the accounts that follow a user's account is difficult because +we can't load all of the follow relationships into memory (as they are contained +in lots of large csv files), so we have to do the following: + +- Open a single follows csv file at a time +- Read through each line to get the follower and following IDs +- Find the user associated with the following ID +- Insert into the notifications table an entry for each of the resolved accounts + +We load `users` into memory as we need to do a lookup for each of +the follows. This works ok for the max dataset we are generating, but could be +problematic for larger datasets. diff --git a/cedar/data-generation/config.js b/cedar/data-generation/config.js index 1d8066d1..b29d00df 100644 --- a/cedar/data-generation/config.js +++ b/cedar/data-generation/config.js @@ -1,4 +1,4 @@ -const SCALING_FACTOR = 1; +const SCALING_FACTOR = 0.01; const DATA_DIR = process.env.DATA_DIR || './data'; diff --git a/cedar/data-generation/data-generate.sh b/cedar/data-generation/data-generate.sh index 0aa83941..ab67eca0 100755 --- a/cedar/data-generation/data-generate.sh +++ b/cedar/data-generation/data-generate.sh @@ -9,3 +9,4 @@ mkdir -p $DATA_DIR node generate-data.js node generate-follows.js node generate-feeds.js +node generate-follows-notifications.js diff --git a/cedar/data-generation/data-stats.sh b/cedar/data-generation/data-stats.sh index 221493c6..20848f08 100755 --- a/cedar/data-generation/data-stats.sh +++ b/cedar/data-generation/data-stats.sh @@ -8,9 +8,10 @@ count_lines() { fi } -echo "Sites = $(count_lines './data/sites.csv')" -echo "Accounts = $(count_lines './data/accounts.csv')" -echo "Users = $(count_lines './data/users.csv')" -echo "Posts = $(count_lines './data/posts.csv')" -echo "Follows = $(count_lines './data/follows_*.csv')" -echo "Feeds = $(count_lines './data/feeds_*.csv')" +echo "Sites = $(count_lines './data/sites.csv')" +echo "Accounts = $(count_lines './data/accounts.csv')" +echo "Users = $(count_lines './data/users.csv')" +echo "Posts = $(count_lines './data/posts.csv')" +echo "Follows = $(count_lines './data/follows_*.csv')" +echo "Feeds = $(count_lines './data/feeds_*.csv')" +echo "Notifications = $(count_lines './data/follows_notifications_*.csv')" diff --git a/cedar/data-generation/data-upload.sh b/cedar/data-generation/data-upload.sh index 242baa95..49dc4855 100644 --- a/cedar/data-generation/data-upload.sh +++ b/cedar/data-generation/data-upload.sh @@ -34,3 +34,14 @@ for file in $(gsutil ls $DATA_DIR | grep 'feeds_.*\.csv'); do --quiet \ $file done + +for file in $(gsutil ls $DATA_DIR | grep 'follows_notifications_.*\.csv'); do + echo "Importing $file..." + + gcloud sql import csv $INSTANCE \ + --database=$DATABASE \ + --table=notifications \ + --columns=user_id,account_id,event_type \ + --quiet \ + $file +done diff --git a/cedar/data-generation/generate-follows-notifications-worker.js b/cedar/data-generation/generate-follows-notifications-worker.js new file mode 100644 index 00000000..bd9b0596 --- /dev/null +++ b/cedar/data-generation/generate-follows-notifications-worker.js @@ -0,0 +1,92 @@ +const { parentPort, workerData } = require('node:worker_threads'); +const fs = require('node:fs'); +const readline = require('node:readline'); +const path = require('node:path'); + +const { DATA_DIR } = require('./config.js'); + +const CHUNK_SIZE = 1000; // Write to disk in chunks of 1000 lines + +function parseCSV(data) { + const lines = data.trim().split('\n'); + const headers = lines[0].split(','); + + return lines.slice(1).map((line) => { + const values = line.split(','); + + return headers.reduce((object, header, index) => { + object[header] = values[index]; + + return object; + }, {}); + }); +} + +async function generateFollowsNotifications(followsFiles) { + const users = []; + + const usersData = fs.readFileSync( + path.join(DATA_DIR, 'users.csv'), + 'utf-8', + ); + + for (const user of parseCSV(usersData)) { + users[user.account_id] = user; + } + + for (const file of followsFiles) { + const filePath = path.join(DATA_DIR, file); + + const readStream = fs.createReadStream(filePath); + const writeStream = fs.createWriteStream( + filePath.replace('follows', 'follows_notifications'), + ); + const rl = readline.createInterface({ + input: readStream, + crlfDelay: Number.POSITIVE_INFINITY, + }); + + writeStream.write('user_id,account_id,event_type\n'); + + let batch = []; + let isFirstLine = true; + + for await (const line of rl) { + if (isFirstLine) { + isFirstLine = false; + + continue; + } + + const [followerId, followingId] = line.split(','); + const user = users[followingId]; + + // Not all accounts are associated with a user + if (!user) { + continue; + } + + batch.push(`${user.internal_id},${followerId},4`); + + if (batch.length >= CHUNK_SIZE) { + writeStream.write(`${batch.join('\n')}\n`); + + batch = []; + } + } + + if (batch.length > 0) { + writeStream.write(`${batch.join('\n')}\n`); + } + + await new Promise((resolve) => writeStream.end(resolve)); + + rl.close(); + + parentPort.postMessage({ type: 'progress', value: 1 }); + } + + parentPort.postMessage({ type: 'done' }); +} + +generateFollowsNotifications(workerData.followsFiles); diff --git a/cedar/data-generation/generate-follows-notifications.js b/cedar/data-generation/generate-follows-notifications.js new file mode 100644 index 00000000..5d8a93d6 --- /dev/null +++ b/cedar/data-generation/generate-follows-notifications.js @@ -0,0 +1,99 @@ +const { Worker } = require('node:worker_threads'); +const cliProgress = require('cli-progress'); +const fs = require('node:fs'); +const os = require('node:os'); + +const { DATA_DIR } = require('./config.js'); + +const NUM_WORKERS = os.cpus().length; + +const progressBar = new cliProgress.SingleBar( + { + clearOnComplete: false, + hideCursor: true, + format: '{bar} {percentage}%', + }, + cliProgress.Presets.shades_classic, +); + +async function main() { + const startTime = Date.now(); + + const followsFiles = fs + .readdirSync(DATA_DIR) + .filter((file) => file.startsWith('follows_') && file.endsWith('.csv')); + + const workers = []; + const filesPerWorker = Math.ceil(followsFiles.length / NUM_WORKERS); + + for (let i = 0; i < NUM_WORKERS; i++) { + const start = i * filesPerWorker; + const end = Math.min(start + filesPerWorker, followsFiles.length); + + const worker = new Worker( + './generate-follows-notifications-worker.js', + { + workerData: { + followsFiles: followsFiles.slice(start, end), + }, + }, + ); + + workers.push(worker); + } + + console.log( + `Generating follows notifications from ${followsFiles.length} follows csv files across ${NUM_WORKERS} workers\n`, + ); + + progressBar.start(followsFiles.length, 0); + + await Promise.all( + workers.map( + (worker) => + new Promise((resolve, reject) => { + worker.on('message', ({ type, value }) => { + if (type === 'progress') { + progressBar.increment(value); + } + + if (type === 'done') { + resolve(); + } + }); + + worker.on('error', reject); + + worker.on('exit', (code) => { + if (code !== 0) { + reject( + new Error( + `Worker stopped with exit code ${code}`, + ), + ); + } + }); + }), + ), + ); + + progressBar.stop(); + + const endTime = Date.now(); + const duration = (endTime - startTime) / 1000; + const minutes = Math.floor(duration / 60); + const seconds = (duration % 60).toFixed(0); + + console.log( + `\n✅ Follows notifications generation completed in ${minutes}m ${seconds}s`, + ); +} + +main() + .then(() => { + process.exit(0); + }) + .catch((error) => { + console.error(error); + process.exit(1); + }); diff --git a/cedar/query-runner/queries/read-notifications.sql b/cedar/query-runner/queries/read-notifications.sql new file mode 100644 index 00000000..1407860a --- /dev/null +++ b/cedar/query-runner/queries/read-notifications.sql @@ -0,0 +1,31 @@ +SELECT + notifications.internal_id AS notification_internal_id, + notifications.created_at AS notification_created_at, + notifications.event_type AS notification_event_type, + notifications.user_id AS notification_user_id, + notifications.account_id AS notification_account_id, + notifications.post_id AS notification_post_id, + notifications.reply_post_id AS notification_reply_post_id, + accounts.name AS account_name, + accounts.username AS account_username, + accounts.description AS account_description, + accounts.icon AS account_icon, + posts.title AS post_title, + posts.content AS post_content, + posts.type AS post_type +FROM + notifications +LEFT JOIN + posts on posts.internal_id = notifications.post_id +LEFT JOIN + posts AS reply_posts on reply_posts.internal_id = notifications.reply_post_id +INNER JOIN + accounts on accounts.internal_id = notifications.account_id +WHERE -- We can only filter on columns in `notifications` table + notifications.user_id = 2340 +AND + notifications.internal_id < 20 -- the cursor for pagination +ORDER BY -- We can only order on columns in `notifications table` + notifications.internal_id DESC +LIMIT + 20; diff --git a/cedar/schema/indexes.sql b/cedar/schema/indexes.sql index 88782db1..76bc9cd3 100644 --- a/cedar/schema/indexes.sql +++ b/cedar/schema/indexes.sql @@ -27,3 +27,6 @@ CREATE INDEX idx_likes_post_id ON likes (post_id); -- Lookup followers in both directions CREATE INDEX idx_follows_follower_id_following_id ON follows (follower_id, following_id); CREATE INDEX idx_follows_following_id_follower_id ON follows (following_id, follower_id); + +-- Lookup notifications for a user +CREATE INDEX idx_notifications_user_id ON notifications (user_id); diff --git a/cedar/schema/tables.sql b/cedar/schema/tables.sql index 47b2500e..6310fffd 100644 --- a/cedar/schema/tables.sql +++ b/cedar/schema/tables.sql @@ -66,3 +66,17 @@ CREATE TABLE follows ( FOREIGN KEY (follower_id) REFERENCES accounts(internal_id) ON DELETE CASCADE, FOREIGN KEY (following_id) REFERENCES accounts(internal_id) ON DELETE CASCADE ); + +CREATE TABLE notifications ( + internal_id INT AUTO_INCREMENT PRIMARY KEY, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + event_type TINYINT UNSIGNED NOT NULL, -- 1=LIKE, 2=REPOST, 3=REPLY, 4=FOLLOW + user_id INT NOT NULL, -- The user the notification is for + account_id INT NOT NULL, -- The account that "did" the notification + post_id INT NULL, -- NULL for FOLLOW events + reply_post_id INT NULL, -- NULL for non-reply events + FOREIGN KEY (user_id) REFERENCES users(internal_id) ON DELETE CASCADE, + FOREIGN KEY (account_id) REFERENCES accounts(internal_id) ON DELETE CASCADE, + FOREIGN KEY (post_id) REFERENCES posts(internal_id) ON DELETE CASCADE, + FOREIGN KEY (reply_post_id) REFERENCES posts(internal_id) ON DELETE CASCADE +); diff --git a/cedar/upload-data.sh b/cedar/upload-data.sh index 900f290b..06b453fa 100755 --- a/cedar/upload-data.sh +++ b/cedar/upload-data.sh @@ -4,31 +4,42 @@ source ./args.sh DATA_DIR=gs://$BUCKET_NAME/csv -gsutil -m cp ./data-generation/data/gz/* $DATA_DIR - -gcloud sql import csv $INSTANCE_NAME $DATA_DIR/sites.csv.gz --quiet --database=$DB_NAME --table=sites --columns=internal_id,host,webhook_secret -gcloud sql import csv $INSTANCE_NAME $DATA_DIR/accounts.csv.gz --quiet --database=$DB_NAME --table=accounts --columns=internal_id,name,username,description,icon -gcloud sql import csv $INSTANCE_NAME $DATA_DIR/users.csv.gz --quiet --database=$DB_NAME --table=users --columns=internal_id,account_id,site_id -gcloud sql import csv $INSTANCE_NAME $DATA_DIR/posts.csv.gz --quiet --database=$DB_NAME --table=posts --columns=internal_id,title,content,author_id,type - -for file in $(gsutil ls $DATA_DIR | grep 'follows_.*\.csv.gz'); do - echo "Importing $file..." - - gcloud sql import csv $INSTANCE_NAME \ - --database=$DB_NAME \ - --table=follows \ - --columns=follower_id,following_id \ - --quiet \ - "$file" -done - -for file in $(gsutil ls $DATA_DIR | grep 'feeds_.*\.csv.gz'); do +gsutil -m cp ./data-generation/data/gz/follows_notifications_*.csv.gz $DATA_DIR + +# gcloud sql import csv $INSTANCE_NAME $DATA_DIR/sites.csv.gz --quiet --database=$DB_NAME --table=sites --columns=internal_id,host,webhook_secret +# gcloud sql import csv $INSTANCE_NAME $DATA_DIR/accounts.csv.gz --quiet --database=$DB_NAME --table=accounts --columns=internal_id,name,username,description,icon +# gcloud sql import csv $INSTANCE_NAME $DATA_DIR/users.csv.gz --quiet --database=$DB_NAME --table=users --columns=internal_id,account_id,site_id +# gcloud sql import csv $INSTANCE_NAME $DATA_DIR/posts.csv.gz --quiet --database=$DB_NAME --table=posts --columns=internal_id,title,content,author_id,type + +# for file in $(gsutil ls $DATA_DIR | grep 'follows_.*\.csv.gz'); do +# echo "Importing $file..." + +# gcloud sql import csv $INSTANCE_NAME \ +# --database=$DB_NAME \ +# --table=follows \ +# --columns=follower_id,following_id \ +# --quiet \ +# "$file" +# done + +# for file in $(gsutil ls $DATA_DIR | grep 'feeds_.*\.csv.gz'); do +# echo "Importing $file..." + +# gcloud sql import csv $INSTANCE_NAME \ +# --database=$DB_NAME \ +# --table=feeds \ +# --columns=user_id,post_id,author_id,type \ +# --quiet \ +# $file +# done + +for file in $(gsutil ls $DATA_DIR | grep 'follows_notifications_.*\.csv.gz'); do echo "Importing $file..." gcloud sql import csv $INSTANCE_NAME \ --database=$DB_NAME \ - --table=feeds \ - --columns=user_id,post_id,author_id,type \ + --table=notifications \ + --columns=user_id,account_id,event_type \ --quiet \ $file done