Skip to content

Metrics #2518

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jan 18, 2021
Merged

Metrics #2518

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ansible/inventory.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ hosts:
ubuntu1604-x64-1: {ip: 138.197.224.240, alias: www}
ubuntu1804-x64-1: {ip: 178.128.202.158, alias: gzemnid}
ubuntu1804-x64-2: {ip: 45.55.45.227, alias: unofficial-builds}
ubuntu1804-x64-3: {ip: 157.245.7.159, alias: metrics}

- joyent:
smartos15-x64-1: {ip: 165.225.151.21, alias: backup}
Expand Down
12 changes: 12 additions & 0 deletions ansible/playbooks/create-metrics.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---

#
# sets up the host that runs metrics collection and storage
#

- hosts: infra-digitalocean-ubuntu1804-x64-3
roles:
- bootstrap
- package-upgrade
- baselayout
- metrics
17 changes: 17 additions & 0 deletions ansible/roles/metrics/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
### Current Work Flow
* The logs from cloudflare come in five minute intervals (e.g 20210105/20210105T141000Z_20210105T141500Z_adbdac1f.log.gz contains every log from the period of 1410 - 1415 on 2021/01/05 UTC)
* Every time a new log file is created it is automatically pushed to GCP to the bucket `cloudflare-logs-nodejs`
* When a new file is uploaded to `cloudflare-logs-nodejs` this triggers a "notification". This notification then sends a message to the `cloudflare-logs` topic.
* The subscription `process-logs` listens to the `cloudflare-logs` topic, when it receives a new message it takes the message metadata and then send an authenticated HTTP request to URL of the Cloud Run instance
* The Cloud Run instance is where the processing of the data takes place - the Cloud Run instance runs a docker image of `files/process-cloudflare` and has `npm run processLogs` as its entry point. When the instance receives a HTTP request it extracts the filename and bucket name from the metadata and then downloads the file inside memory. It then parses through the data and removes any unneeded rows (Logs of people visting the webpage etc.) and keeps only the logs which reference to someone downloading a binary. These logs are then cleaned of any private information and converted to CSV format.
* This data is then uploaded to a different bucket called `processed-logs-nodejs` in the same five minute interval format of the original logs
* Upon completion the Cloud Run instance sends a 200 back to the subscription and the message is then popped off the topic (The message is "acknowledged") the indicated that the message and be read and processed and is no longer needed.
* If any error occurs in the processing a 500 code is sent back and the subscription will try to resend the message after a time delay.
* The Cloud Run instance has 1GiB of memory assigned to it which should be enough to handle the size of the files we see.


### Accesing the GCP from command line

* Copy secrets/build/infra/gcp-metrics-service-acct-key.json to ~nodejs/
* As user `nodejs` run `gcloud auth activate-service-account --key-file gcp-metrics-service-acct-key.json`
* Set up new SSH key with access to [email protected] and [email protected] under user `nodejs`
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
node_modules
11 changes: 11 additions & 0 deletions ansible/roles/metrics/files/process-cloudflare/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM node:15-slim

WORKDIR /usr/src/app

COPY package*.json ./

RUN npm install

COPY . ./

CMD ["npm", "run", "processLogs"]
25 changes: 25 additions & 0 deletions ansible/roles/metrics/files/process-cloudflare/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"name": "process-cloudflare",
"version": "1.0.0",
"description": "",
"main": "process-cloudflare.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"processLogs": "node --trace-warnings process-cloudflare.js"
},
"author": "Rod <[email protected]> (http://r.va.gg/)",
"contributors": [
"Ash <[email protected]>"
],
"license": "Apache-2.0",
"dependencies": {
"@google-cloud/storage": "^5.0.0",
"body-parser": "^1.19.0",
"express": "^4.17.1",
"split2": "~3.1.1",
"strftime": "~0.10.0"
},
"devDependencies": {
"eslint": "^7.8.1"
}
}
226 changes: 226 additions & 0 deletions ansible/roles/metrics/files/process-cloudflare/process-cloudflare.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
#!/usr/bin/env node

'use strict'

const strftime = require('strftime').timezone(0)
const { Storage } = require('@google-cloud/storage')
const express = require('express')
const bodyParser = require('body-parser')
const app = express()

app.use(bodyParser.json())

const extensionRe = /\.(tar\.gz|tar\.xz|pkg|msi|exe|zip|7z)$/
const uriRe = /(\/+(dist|download\/+release)\/+(node-latest\.tar\.gz|([^/]+)\/+((win-x64|win-x86|x64)?\/+?node\.exe|(x64\/)?node-+(v[0-9.]+)[.-]([^? ]+))))/
const versionRe = /^v[0-9.]+$/

function determineOS (path, file, fileType) {
if (/node\.exe$/.test(file)) {
return 'win'
} else if (/\/node-latest\.tar\.gz$/.test(path)) {
return 'src'
} else if (fileType == null) {
return ''
} else if (/msi$/.test(fileType) || /^win-/.test(fileType)) {
return 'win'
} else if (/^tar\..z$/.test(fileType)) {
return 'src'
} else if (/^headers\.tar\..z$/.test(fileType)) {
return 'headers'
} else if (/^linux-/.test(fileType)) {
return 'linux'
} else if (fileType === 'pkg' || /^darwin-/.test(fileType)) {
return 'osx'
} else if (/^sunos-/.test(fileType)) {
return 'sunos'
} else if (/^aix-/.test(fileType)) {
return 'aix'
} else {
return ''
}
}

function determineArch (fileType, winArch, os) {
if (fileType != null) {
if (fileType.indexOf('x64') >= 0 || fileType === 'pkg') {
// .pkg for Node.js <= 0.12 were universal so may be used for either x64 or x86
return 'x64'
} else if (fileType.indexOf('x86') >= 0) {
return 'x86'
} else if (fileType.indexOf('armv6') >= 0) {
return 'armv6l'
} else if (fileType.indexOf('armv7') >= 0) { // 4.1.0 had a misnamed binary, no 'l' in 'armv7l'
return 'armv7l'
} else if (fileType.indexOf('arm64') >= 0) {
return 'arm64'
} else if (fileType.indexOf('ppc64le') >= 0) {
return 'ppc64le'
} else if (fileType.indexOf('ppc64') >= 0) {
return 'ppc64'
} else if (fileType.indexOf('s390x') >= 0) {
return 's390x'
}
}

if (os === 'win') {
// we get here for older .msi files and node.exe files
if (winArch && winArch.indexOf('x64') >= 0) {
// could be 'x64' or 'win-x64'
return 'x64'
} else {
// could be 'win-x86' or ''
return 'x86'
}
}

return ''
}

function logTransform2 (jsonObj) {
if (jsonObj.ClientRequestMethod !== 'GET' || // Drop anything that isnt a GET or a 200 range response
jsonObj.EdgeResponseStatus < 200 ||
jsonObj.EdgeResponseStatus >= 300) {
return
}

if (jsonObj.EdgeResponseBytes < 1024) { // unreasonably small for something we want to measure
return
}

if (!extensionRe.test(jsonObj.ClientRequestPath)) { // not a file we care about
return
}

const requestPath = jsonObj.ClientRequestPath.replace(/\/\/+/g, '/')
const uriMatch = requestPath.match(uriRe) // Check that the request is for an actual node file
if (!uriMatch) { // what is this then?
return
}

const path = uriMatch[1]
const pathVersion = uriMatch[4]
const file = uriMatch[5]
const winArch = uriMatch[6]
const fileVersion = uriMatch[8]
const fileType = uriMatch[9]

let version = ''
// version can come from the filename or the path, filename is best
// but it may not be there (e.g. node.exe) so fall back to path version
if (versionRe.test(fileVersion)) {
version = fileVersion
} else if (versionRe.test(pathVersion)) {
version = pathVersion
}

const os = determineOS(path, file, fileType)
const arch = determineArch(fileType, winArch, os)

const line = []
line.push(strftime('%Y-%m-%d', new Date(jsonObj.EdgeStartTimestamp / 1000 / 1000))) // date
line.push(jsonObj.ClientCountry.toUpperCase()) // country
line.push('') // state/province, derived from chunk.EdgeColoCode probably
line.push(jsonObj.ClientRequestPath) // URI
line.push(version) // version
line.push(os) // os
line.push(arch) // arch
line.push(jsonObj.EdgeResponseBytes)

return (`${line.join(',')}\n`)
}

async function processLogs (bucket, filename, callback) {
console.log('Node version is: ' + process.version)
console.log('BUCKET ' + bucket)
console.log('FILENAME ' + filename)
let processedFile = filename.split('.')[0]
processedFile = processedFile.split('_')[0].concat('_', processedFile.split('_')[1])
console.log('PROCESSEDFILENAME ' + processedFile)
createPipeline(bucket, filename, processedFile, callback)
}

function createPipeline (bucket, filename, processedFile, callback) {
const storage = new Storage({
keyFilename: 'metrics-processor-service-key.json'
})
console.log('INSIDE CREATE PIPELINE')

const readBucket = storage.bucket(bucket)
const writeBucket = storage.bucket('processed-logs-nodejs')

readBucket.file(filename).download(function (err, contents) {
if (err) {
console.log('ERROR IN DOWNLOAD ', filename, err)
// callback(500)
callback()
} else {
const stringContents = contents.toString()
console.log('String length: ', stringContents.length)
const contentsArray = stringContents.split('\n')
console.log('Array Length: ', contentsArray.length)
let results = ''
for (const line of contentsArray) {
try {
const jsonparse = JSON.parse(line)
const printout = logTransform2(jsonparse)
if (printout !== undefined) { results = results.concat(printout) }
} catch (err) { console.log(err) }
}

writeBucket.file(processedFile).save(results, function (err) {
if (err) {
console.log('ERROR UPLOADING: ', err)
const used = process.memoryUsage()
for (const key in used) {
console.log(`${key} ${Math.round(used[key] / 1024 / 1024 * 100) / 100} MB`)
}
callback(500)
} else {
console.log('Upload complete')
const used = process.memoryUsage()
for (const key in used) {
console.log(`${key} ${Math.round(used[key] / 1024 / 1024 * 100) / 100} MB`)
}
callback(200)
}
})
}
})
}

app.post('/', async (req, res) => {
if (!req.body) {
const msg = 'No Pub/Sub Message received'
console.error(msg)
res.status(400).send('Bad Request: ' + msg)
return
}
if (!req.body.message) {
const msg = 'invalid Pub/Sub message format'
console.error(`error: ${msg}`)
res.status(400).send(`Bad Request: ${msg}`)
return
}
const eventType = req.body.message.attributes.eventType

if (eventType !== 'OBJECT_FINALIZE') {
const msg = `Event type is ${eventType} not OBJECT_FINALIZE`
console.error(`error ${msg}`)
res.status(400).send(`Bad Request: ${msg}`)
return
}

const bucket = req.body.message.attributes.bucketId
const filename = req.body.message.attributes.objectId
console.log('EVENT TYPE: ', eventType)
processLogs(bucket, filename, function (status) {
res.status(status).send()
})
})

const port = process.env.PORT || 8080
app.listen(port, () => {
console.log('Listening on port: ', port)
})

module.exports = app
84 changes: 84 additions & 0 deletions ansible/roles/metrics/files/process-cloudflare/summaries.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#!/usr/bin/env node

// data example:
//
// day,country,region,path,version,os,arch,bytes
// 2019-10-31,US,,/dist/v13.0.1/node-v13.0.1-linux-x64.tar.xz,v13.0.1,linux,x64,20102340
//

const { pipeline, Transform } = require('stream')
const split2 = require('split2')

const csvStream = new Transform({
readableObjectMode: true,
transform (chunk, encoding, callback) {
try {
const schunk = chunk.toString()
if (!schunk.startsWith('day,country')) { // ignore header
this.push(schunk.split(','))
}
callback()
} catch (e) {
callback(e)
}
}
})

const counts = { bytes: 0, total: 0 }
function increment (type, key) {
if (!key) {
key = 'unknown'
}

if (!counts[type]) {
counts[type] = {}
}

if (counts[type][key] === undefined) {
counts[type][key] = 1
} else {
counts[type][key]++
}
}

function prepare () {
function sort (type) {
const ca = Object.entries(counts[type])
ca.sort((e1, e2) => e2[1] > e1[1] ? 1 : e2[1] < e1[1] ? -1 : 0)
counts[type] = ca.reduce((p, c) => {
p[c[0]] = c[1]
return p
}, {})
}

'country version os arch'.split(' ').forEach(sort)
}

const summaryStream = new Transform({
writableObjectMode: true,
transform (chunk, encoding, callback) {
const [date, country, region, path, version, os, arch, bytes] = chunk
increment('country', country)
increment('version', version)
increment('os', os)
increment('arch', arch)
counts.bytes += parseInt(bytes, 10)
counts.total++
callback()
}
})

pipeline(
process.stdin,
split2(),
csvStream,
summaryStream,
(err) => {
prepare()
console.log(JSON.stringify(counts, null, 2))
if (err) {
console.error('ERROR', err)
process.exit(1)
}
}
)
Loading