Skip to content
This repository was archived by the owner on Nov 21, 2023. It is now read-only.

Commit 6c88927

Browse files
whdmreid-moz
authored andcommitted
Add support for new-style gzip-compressed objects (#88)
1 parent 93c180a commit 6c88927

File tree

8 files changed

+122
-15
lines changed

8 files changed

+122
-15
lines changed

moztelemetry/heka/message_parser.py

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import boto
99
import snappy
1010
import ujson as json
11+
import json as standard_json
1112
from cStringIO import StringIO
1213
from google.protobuf.message import DecodeError
1314

@@ -24,24 +25,34 @@ def parse_heka_message(message):
2425

2526

2627
def _parse_heka_record(record):
27-
if record.message.payload:
28-
result = json.loads(record.message.payload)
29-
else:
30-
result = {}
31-
result["meta"] = {
28+
result = {"meta": {
3229
# TODO: uuid, logger, severity, env_version, pid
3330
"Timestamp": record.message.timestamp,
3431
"Type": record.message.type,
3532
"Hostname": record.message.hostname,
36-
}
33+
}}
34+
35+
if record.message.payload:
36+
payload = _parse_json(record.message.payload)
37+
else:
38+
payload = {}
3739

3840
for field in record.message.fields:
3941
name = field.name.split('.')
4042
value = field.value_string
4143
if field.value_type == 1:
42-
# TODO: handle bytes in a way that doesn't cause problems with JSON
43-
# value = field.value_bytes
44-
continue
44+
# Non-UTF8 bytes fields are currently not supported
45+
try:
46+
string = field.value_bytes[0].decode('utf-8')
47+
except UnicodeDecodeError:
48+
continue
49+
# Special case: the submission field (bytes) replaces the top level
50+
# Payload in the hindsight-based infra
51+
if name[0] == 'submission':
52+
payload = _parse_json(string)
53+
continue
54+
# handle bytes in a way that doesn't cause problems with JSON
55+
value = [string]
4556
elif field.value_type == 2:
4657
value = field.value_integer
4758
elif field.value_type == 3:
@@ -54,7 +65,12 @@ def _parse_heka_record(record):
5465
else:
5566
_add_field(result, name, value)
5667

57-
return result
68+
# merge results back into the payload/submission
69+
# payload may contain NULLed out structures in the new infra and must
70+
# therefore be the receiver of the update
71+
payload.update(result)
72+
73+
return payload
5874

5975

6076
def _add_field(container, keys, value):
@@ -68,6 +84,15 @@ def _add_field(container, keys, value):
6884
_add_field(container[key], keys, value)
6985

7086

87+
def _parse_json(string):
88+
try:
89+
result = json.loads(string)
90+
except:
91+
# Fall back to the standard parser if ujson fails
92+
result = standard_json.loads(string)
93+
return result
94+
95+
7196
def _lazyjson(content):
7297
if not isinstance(content, basestring):
7398
raise ValueError("Argument must be a string.")
@@ -88,7 +113,7 @@ class WrapperType(type(default)):
88113
def wrap(method_name):
89114
def _wrap(*args, **kwargs):
90115
if not hasattr(WrapperType, '__cache__'):
91-
setattr(WrapperType, '__cache__', json.loads(content))
116+
setattr(WrapperType, '__cache__', _parse_json(content))
92117

93118
cached = WrapperType.__cache__
94119
method = getattr(cached, method_name)

moztelemetry/spark.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
from .dataset import Dataset
1111
from .histogram import Histogram
1212

13+
from .util.streaming_gzip import streaming_gzip_wrapper
14+
1315
logger = logging.getLogger(__name__)
1416

1517
if not boto.config.has_section('Boto'):

moztelemetry/store.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
import boto3
77

8+
from .util.streaming_gzip import streaming_gzip_wrapper
9+
810

911
class S3Store:
1012

@@ -36,7 +38,11 @@ def get_key(self, key):
3638
try:
3739
# get_key must return a file-like object because that's what's
3840
# required by parse_heka_message
39-
return bucket.Object(key).get()['Body']
41+
s3object = bucket.Object(key).get()
42+
if s3object.get('ContentEncoding') == "gzip":
43+
return streaming_gzip_wrapper(s3object['Body'])
44+
else:
45+
return s3object['Body']
4046
except:
4147
raise Exception('Error retrieving key "{}" from S3'.format(key))
4248

moztelemetry/util/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# This Source Code Form is subject to the terms of the Mozilla Public
2+
# License, v. 2.0. If a copy of the MPL was not distributed with this
3+
# file, you can obtain one at http://mozilla.org/MPL/2.0/.

moztelemetry/util/streaming_gzip.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# This Source Code Form is subject to the terms of the Mozilla Public
2+
# License, v. 2.0. If a copy of the MPL was not distributed with this
3+
# file, you can obtain one at http://mozilla.org/MPL/2.0/.
4+
5+
from gzip import GzipFile
6+
from StringIO import StringIO
7+
from os import SEEK_SET, SEEK_END
8+
9+
# boto's s3.Key and botocore's StreamingBody do not support the seek()
10+
# operation, which we need in order to pass the stream along to a
11+
# GzipFile for gzip decompression. However, the implementation of
12+
# GzipFile only uses seek() in combination with tell() on its fileobj
13+
# to determine whether EOF has been reached when parsing a new
14+
# member. Since in our case we know:
15+
# - there is only one member per fileobj
16+
# - it does not begin at EOF
17+
# - the only remaining data after decompression will be the final 8 bytes of
18+
# gzip member verification metadata
19+
# we can create:
20+
# - a modified GzipFile that does not need to use seek() to perform the
21+
# final CRC and size verification checks
22+
# - a wrapper around StreamingBody that ensures the first member passes the
23+
# not-EOF check without needing to actually perform a seek()
24+
# Combining these two shims gives us a way to transparently perform streaming
25+
# gzip decompression of s3 objects.
26+
27+
# Refer to the implementation of GzipFile for more details.
28+
29+
30+
class StreamingGzipFile(GzipFile):
31+
def __init__(self, **args):
32+
super(StreamingGzipFile, self).__init__(**args)
33+
34+
def _read_eof(self):
35+
remainder = self.decompress.unused_data + self.fileobj.read()
36+
assert(len(remainder) == 8)
37+
self.fileobj.close()
38+
self.fileobj = StringIO(remainder)
39+
super(StreamingGzipFile, self)._read_eof()
40+
41+
42+
class GzipStreamingBody:
43+
def __init__(self, sb):
44+
self.sb = sb
45+
self.eof = False
46+
47+
def tell(self):
48+
return int(self.eof)
49+
50+
def close(self):
51+
return self.sb.close()
52+
53+
def read(self, n=-1):
54+
return self.sb.read(n)
55+
56+
def seek(self, offset, whence=SEEK_SET):
57+
if whence == SEEK_END:
58+
self.eof = True
59+
60+
61+
def streaming_gzip_wrapper(body):
62+
return StreamingGzipFile(fileobj=GzipStreamingBody(body))

tests/data/test_gzip.heka

483 Bytes
Binary file not shown.

tests/data/test_gzip_mixed.heka

545 Bytes
Binary file not shown.

tests/heka/test_message_parser.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@
66
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
77
import json
88
from moztelemetry.heka import message_parser
9+
from moztelemetry.util.streaming_gzip import streaming_gzip_wrapper
910

1011

1112
def test_unpack(data_dir):
12-
for t in ["plain", "snappy", "mixed"]:
13+
for t in ["plain", "snappy", "mixed", "gzip", "gzip_mixed"]:
1314
filename = "{}/test_{}.heka".format(data_dir, t)
1415
with open(filename, "rb") as o:
16+
if "gzip" in t:
17+
o = streaming_gzip_wrapper(o)
1518
msg = 0
1619
for r, b in message_parser.unpack(o, try_snappy=True):
1720
j = json.loads(r.message.payload)
@@ -21,11 +24,14 @@ def test_unpack(data_dir):
2124

2225

2326
def test_unpack_nosnappy(data_dir):
24-
expected_counts = {"plain": 10, "snappy": 0, "mixed": 5}
27+
expected_counts = {"plain": 10, "snappy": 0, "mixed": 5,
28+
"gzip": 10, "gzip_mixed": 5}
2529
for t in expected_counts.keys():
2630
count = 0
2731
filename = "{}/test_{}.heka".format(data_dir, t)
2832
with open(filename, "rb") as o:
33+
if "gzip" in t:
34+
o = streaming_gzip_wrapper(o)
2935
try:
3036
for r, b in message_parser.unpack(o, try_snappy=False):
3137
count += 1
@@ -35,13 +41,16 @@ def test_unpack_nosnappy(data_dir):
3541

3642

3743
def test_unpack_strict(data_dir):
38-
expected_exceptions = {"plain": False, "snappy": True, "mixed": True}
44+
expected_exceptions = {"plain": False, "snappy": True, "mixed": True,
45+
"gzip": False, "gzip_mixed": True}
3946
for t in expected_exceptions.keys():
4047
count = 0
4148
filename = "{}/test_{}.heka".format(data_dir, t)
4249
threw = False
4350
got_err = False
4451
with open(filename, "rb") as o:
52+
if "gzip" in t:
53+
o = streaming_gzip_wrapper(o)
4554
try:
4655
for r, b in message_parser.unpack(o, strict=True, try_snappy=False):
4756
if r.error is not None:

0 commit comments

Comments
 (0)