Skip to content

Commit b972db0

Browse files
srikanthccvlzchen
authored andcommitted
Add support for OTLP Log exporter (open-telemetry#1943)
1 parent 4bc1924 commit b972db0

File tree

3 files changed

+677
-0
lines changed

3 files changed

+677
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
# Copyright The OpenTelemetry Authors
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
14+
from typing import Optional, Sequence
15+
from grpc import ChannelCredentials, Compression
16+
from opentelemetry.exporter.otlp.proto.grpc.exporter import (
17+
OTLPExporterMixin,
18+
_translate_key_values,
19+
get_resource_data,
20+
_translate_value,
21+
)
22+
from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import (
23+
ExportLogsServiceRequest,
24+
)
25+
from opentelemetry.proto.collector.logs.v1.logs_service_pb2_grpc import (
26+
LogsServiceStub,
27+
)
28+
from opentelemetry.proto.common.v1.common_pb2 import InstrumentationLibrary
29+
from opentelemetry.proto.logs.v1.logs_pb2 import (
30+
InstrumentationLibraryLogs,
31+
ResourceLogs,
32+
SeverityNumber,
33+
)
34+
from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord as PB2LogRecord
35+
from opentelemetry.sdk.logs import LogRecord as SDKLogRecord
36+
from opentelemetry.sdk.logs import LogData
37+
from opentelemetry.sdk.logs.export import LogExporter, LogExportResult
38+
39+
40+
class OTLPLogExporter(
41+
LogExporter,
42+
OTLPExporterMixin[SDKLogRecord, ExportLogsServiceRequest, LogExportResult],
43+
):
44+
45+
_result = LogExportResult
46+
_stub = LogsServiceStub
47+
48+
def __init__(
49+
self,
50+
endpoint: Optional[str] = None,
51+
insecure: Optional[bool] = None,
52+
credentials: Optional[ChannelCredentials] = None,
53+
headers: Optional[Sequence] = None,
54+
timeout: Optional[int] = None,
55+
compression: Optional[Compression] = None,
56+
):
57+
super().__init__(
58+
**{
59+
"endpoint": endpoint,
60+
"insecure": insecure,
61+
"credentials": credentials,
62+
"headers": headers,
63+
"timeout": timeout,
64+
"compression": compression,
65+
}
66+
)
67+
68+
def _translate_name(self, log_data: LogData) -> None:
69+
self._collector_log_kwargs["name"] = log_data.log_record.name
70+
71+
def _translate_time(self, log_data: LogData) -> None:
72+
self._collector_log_kwargs[
73+
"time_unix_nano"
74+
] = log_data.log_record.timestamp
75+
76+
def _translate_span_id(self, log_data: LogData) -> None:
77+
self._collector_log_kwargs[
78+
"span_id"
79+
] = log_data.log_record.span_id.to_bytes(8, "big")
80+
81+
def _translate_trace_id(self, log_data: LogData) -> None:
82+
self._collector_log_kwargs[
83+
"trace_id"
84+
] = log_data.log_record.trace_id.to_bytes(16, "big")
85+
86+
def _translate_trace_flags(self, log_data: LogData) -> None:
87+
self._collector_log_kwargs["flags"] = int(
88+
log_data.log_record.trace_flags
89+
)
90+
91+
def _translate_body(self, log_data: LogData):
92+
self._collector_log_kwargs["body"] = _translate_value(
93+
log_data.log_record.body
94+
)
95+
96+
def _translate_severity_text(self, log_data: LogData):
97+
self._collector_log_kwargs[
98+
"severity_text"
99+
] = log_data.log_record.severity_text
100+
101+
def _translate_attributes(self, log_data: LogData) -> None:
102+
if log_data.log_record.attributes:
103+
self._collector_log_kwargs["attributes"] = []
104+
for key, value in log_data.log_record.attributes.items():
105+
try:
106+
self._collector_log_kwargs["attributes"].append(
107+
_translate_key_values(key, value)
108+
)
109+
except Exception: # pylint: disable=broad-except
110+
pass
111+
112+
def _translate_data(
113+
self, data: Sequence[LogData]
114+
) -> ExportLogsServiceRequest:
115+
# pylint: disable=attribute-defined-outside-init
116+
117+
sdk_resource_instrumentation_library_logs = {}
118+
119+
for log_data in data:
120+
resource = log_data.log_record.resource
121+
122+
instrumentation_library_logs_map = (
123+
sdk_resource_instrumentation_library_logs.get(resource, {})
124+
)
125+
if not instrumentation_library_logs_map:
126+
sdk_resource_instrumentation_library_logs[
127+
resource
128+
] = instrumentation_library_logs_map
129+
130+
instrumentation_library_logs = (
131+
instrumentation_library_logs_map.get(
132+
log_data.instrumentation_info
133+
)
134+
)
135+
if not instrumentation_library_logs:
136+
if log_data.instrumentation_info is not None:
137+
instrumentation_library_logs_map[
138+
log_data.instrumentation_info
139+
] = InstrumentationLibraryLogs(
140+
instrumentation_library=InstrumentationLibrary(
141+
name=log_data.instrumentation_info.name,
142+
version=log_data.instrumentation_info.version,
143+
)
144+
)
145+
else:
146+
instrumentation_library_logs_map[
147+
log_data.instrumentation_info
148+
] = InstrumentationLibraryLogs()
149+
150+
instrumentation_library_logs = (
151+
instrumentation_library_logs_map.get(
152+
log_data.instrumentation_info
153+
)
154+
)
155+
156+
self._collector_log_kwargs = {}
157+
158+
self._translate_name(log_data)
159+
self._translate_time(log_data)
160+
self._translate_span_id(log_data)
161+
self._translate_trace_id(log_data)
162+
self._translate_trace_flags(log_data)
163+
self._translate_body(log_data)
164+
self._translate_severity_text(log_data)
165+
self._translate_attributes(log_data)
166+
167+
self._collector_log_kwargs["severity_number"] = getattr(
168+
SeverityNumber,
169+
"SEVERITY_NUMBER_{}".format(log_data.log_record.severity_text),
170+
)
171+
172+
instrumentation_library_logs.logs.append(
173+
PB2LogRecord(**self._collector_log_kwargs)
174+
)
175+
176+
return ExportLogsServiceRequest(
177+
resource_logs=get_resource_data(
178+
sdk_resource_instrumentation_library_logs,
179+
ResourceLogs,
180+
"logs",
181+
)
182+
)
183+
184+
def export(self, batch: Sequence[LogData]) -> LogExportResult:
185+
return self._export(batch)
186+
187+
def shutdown(self) -> None:
188+
pass

exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)