|
3 | 3 | const { pipeline, Transform } = require('stream')
|
4 | 4 | const split2 = require('split2')
|
5 | 5 | const strftime = require('strftime').timezone(0)
|
| 6 | +const {Storage} = require('@google-cloud/storage'); |
| 7 | + |
| 8 | +const storage = new Storage({keyFilename: "metrics-processor-service-key.json"}); |
6 | 9 |
|
7 | 10 | const jsonStream = new Transform({
|
8 | 11 | readableObjectMode: true,
|
@@ -140,16 +143,30 @@ const logTransformStream = new Transform({
|
140 | 143 | }
|
141 | 144 | })
|
142 | 145 |
|
143 |
| -pipeline( |
144 |
| - process.stdin, |
145 |
| - split2(), |
146 |
| - jsonStream, |
147 |
| - logTransformStream, |
148 |
| - process.stdout, |
149 |
| - (err) => { |
150 |
| - if (err) { |
151 |
| - console.error('ERROR', err) |
152 |
| - process.exit(1) |
153 |
| - } |
154 |
| - } |
155 |
| -) |
| 146 | + |
| 147 | +exports.processLogs = async () => { |
| 148 | + const file = data; |
| 149 | + bucketName = file.bucket; |
| 150 | + fileName = file.name; |
| 151 | + console.log("DATA " + data); |
| 152 | + console.log("BUCKET " + bucketName); |
| 153 | + console.log("FILENAME " + fileName); |
| 154 | + |
| 155 | + processedFile = fileName.split(".")[0]; |
| 156 | + processedFile = processedFile.split("_")[0].concat("_", processedFile.split("_")[1]); |
| 157 | + |
| 158 | + console.log("PROCESSEDFILENAME " + processedFile); |
| 159 | + |
| 160 | + storage.bucket(bucketName).file(file.name).createReadStream() |
| 161 | + .on('error', function(err) { console.error(err) }) |
| 162 | + .pipe(split2()) |
| 163 | + .pipe(jsonStream) |
| 164 | + .pipe(logTransformStream) |
| 165 | + .pipe(storage.bucket('processed-logs-nodejs').file(processedFile).createWriteStream({resumable: false, flags: 'a'}) |
| 166 | + .on("error", err => { |
| 167 | + console.log("ERROR: >> ", err) |
| 168 | + }) |
| 169 | + .on("finished", () => { |
| 170 | + console.log("FINISHED") |
| 171 | + })); |
| 172 | +} |
0 commit comments