Skip to content

Commit 2766731

Browse files
authored
kafkareceiver: add signal-specific topic/encoding (#38985)
#### Description Add signal-specific configuration for topic and encoding. The topics are already signal-specific by default, it just hasn't been possible to explicitly configure a different topic for each signal. Thus if you set the `topic: foo`, it would be used for all signals, which is never going to work with the receiver. Similarly, while the default encoding is the same for all signals (i.e. otlp_proto), some encodings are available only for certain signals, e.g. azure_resource_logs is (obviously) only available for logs. This means you could not use the same receiver for multiple signals unless they each used the same encoding. To address both of these issues we introduce signal-specific configuration: `logs::topic`, `metrics::topic`, `traces::topic`, `logs::encoding`, `metrics::encoding`, and `traces::encoding`. The existing `topic` and `encoding` configuration have been deprecated. If the new fields are set, they will take precedence; otherwise if the deprecated fields are set they will be used. The defaults have not changed. #### Link to tracking issue Fixes #32735 #### Testing Unit tests added. #### Documentation README updated.
1 parent 6682df5 commit 2766731

File tree

9 files changed

+327
-119
lines changed

9 files changed

+327
-119
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: deprecation
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkareceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add signal-specific topic and encoding config, deprecate existing topic/encoding config.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [32735]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/kafkareceiver/README.md

+71-44
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
1414
<!-- end autogenerated section -->
1515

16-
Kafka receiver receives traces, metrics, and logs from Kafka. Message payload encoding is configurable.
17-
18-
Note that metrics and logs only support OTLP.
16+
Kafka receiver receives telemetry data from Kafka, with configurable topics and encodings.
1917

2018
If used in conjunction with the `kafkaexporter` configured with `include_metadata_keys`. The Kafka receiver will also propagate the Kafka headers to the downstream pipeline, giving access to the rest of the pipeline to arbitrary metadata keys and values.
2119

@@ -28,20 +26,19 @@ The following settings can be optionally configured:
2826
- `brokers` (default = localhost:9092): The list of kafka brokers.
2927
- `protocol_version` (default = 2.1.0): Kafka protocol version.
3028
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup
31-
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to read from.
32-
Only one telemetry type may be used for a given topic.
33-
- `encoding` (default = otlp_proto): The encoding of the payload received from kafka. Supports encoding extensions. Tries to load an encoding extension and falls back to internal encodings if no extension was loaded. Available internal encodings:
34-
- `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`, `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively.
35-
- `otlp_json`: the payload is deserialized to `ExportTraceServiceRequest` `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively using JSON encoding.
36-
- `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`.
37-
- `jaeger_json`: the payload is deserialized to a single Jaeger JSON Span using `jsonpb`.
38-
- `zipkin_proto`: the payload is deserialized into a list of Zipkin proto spans.
39-
- `zipkin_json`: the payload is deserialized into a list of Zipkin V2 JSON spans.
40-
- `zipkin_thrift`: the payload is deserialized into a list of Zipkin Thrift spans.
41-
- `raw`: (logs only) the payload's bytes are inserted as the body of a log record.
42-
- `text`: (logs only) the payload are decoded as text and inserted as the body of a log record. By default, it uses UTF-8 to decode. You can use `text_<ENCODING>`, like `text_utf-8`, `text_shift_jis`, etc., to customize this behavior.
43-
- `json`: (logs only) the payload is decoded as JSON and inserted as the body of a log record.
44-
- `azure_resource_logs`: (logs only) the payload is converted from Azure Resource Logs format to OTel format.
29+
- `logs`
30+
- `topic` (default = otlp\_logs): The name of the Kafka topic from which to consume logs.
31+
- `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings).
32+
- `metrics`
33+
- `topic` (default = otlp\_metrics): The name of the Kafka topic from which to consume metrics.
34+
- `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings).
35+
- `traces`
36+
- `topic` (default = otlp\_spans): The name of the Kafka topic from which to consume traces.
37+
- `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings).
38+
- `topic` (Deprecated [v0.124.0]: use `logs::topic`, `traces::topic`, or `metrics::topic`).
39+
If this is set, it will take precedence over the default value for those fields.
40+
- `encoding` (Deprecated [v0.124.0]: use `logs::encoding`, `traces::encoding`, or `metrics::encoding`).
41+
If this is set, it will take precedence over the default value for those fields.
4542
- `group_id` (default = otel-collector): The consumer group that receiver will be consuming messages from
4643
- `client_id` (default = otel-collector): The consumer client ID that receiver will use
4744
- `initial_offset` (default = latest): The initial offset to use if no offset was previously committed. Must be `latest` or `earliest`.
@@ -97,14 +94,52 @@ The following settings can be optionally configured:
9794
- `randomization_factor`: A random factor used to calculate next backoff. Randomized interval = RetryInterval * (1 ± RandomizationFactor)
9895
- `max_elapsed_time`: The maximum amount of time trying to backoff before giving up. If set to 0, the retries are never stopped.
9996

100-
Example:
97+
### Supported encodings
98+
99+
The Kafka receiver supports encoding extensions, as well as the following built-in encodings.
100+
101+
Available for all signals:
102+
103+
- `otlp_proto`: the payload is decoded as OTLP Protobuf
104+
- `otlp_json`: the payload is decoded as OTLP JSON
105+
106+
Available only for traces:
107+
108+
- `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`.
109+
- `jaeger_json`: the payload is deserialized to a single Jaeger JSON Span using `jsonpb`.
110+
- `zipkin_proto`: the payload is deserialized into a list of Zipkin proto spans.
111+
- `zipkin_json`: the payload is deserialized into a list of Zipkin V2 JSON spans.
112+
- `zipkin_thrift`: the payload is deserialized into a list of Zipkin Thrift spans.
113+
114+
Available only for logs:
115+
116+
- `raw`: the payload's bytes are inserted as the body of a log record.
117+
- `text`: the payload are decoded as text and inserted as the body of a log record. By default, it uses UTF-8 to decode. You can use `text_<ENCODING>`, like `text_utf-8`, `text_shift_jis`, etc., to customize this behavior.
118+
- `json`: the payload is decoded as JSON and inserted as the body of a log record.
119+
- `azure_resource_logs`: the payload is converted from Azure Resource Logs format to OTel format.
120+
121+
### Message header propagation
122+
123+
The Kafka receiver will extract Kafka message headers and include them as request metadata (context).
124+
This metadata can then be used throughout the pipeline, for example to set attributes using the
125+
[attributes processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/processor/attributesprocessor/README.md).
126+
127+
### Example configurations
128+
129+
#### Minimal configuration
130+
131+
By default, the receiver does not require any configuration. With the following configuration,
132+
the receiver will consume messages from the default topics from localhost:9092 using the
133+
`otlp_proto` encoding:
101134

102135
```yaml
103136
receivers:
104137
kafka:
105-
protocol_version: 2.0.0
106138
```
107-
Example of connecting to kafka using SASL and TLS:
139+
#### TLS and authentication
140+
141+
In this example the receiver is configured to connect to Kafka using TLS for encryption,
142+
and SASL/SCRAM for authentication:
108143
109144
```yaml
110145
receivers:
@@ -116,39 +151,31 @@ receivers:
116151
password: "secret"
117152
mechanism: "SCRAM-SHA-512"
118153
```
119-
Example of header extraction:
154+
155+
#### Header extraction
156+
157+
In addition to propagating Kafka message headers as metadata as described above in
158+
[Message header propagation](#message-header-propagation), the Kafka receiver can also
159+
be configured to extract and attach specific headers as resource attributes. e.g.
120160
121161
```yaml
122162
receivers:
123163
kafka:
124-
topic: test
125164
header_extraction:
126165
extract_headers: true
127166
headers: ["header1", "header2"]
128167
```
129168
130-
- If we feed following kafka record to `test` topic and use above configs:
131-
```yaml
132-
{
133-
event: Hello,
134-
headers: {
135-
header1: value1,
136-
header2: value2,
137-
}
138-
}
169+
If we produce a Kafka message with headers "header1: value1" and "header2: value2"
170+
with the above configuration, the receiver will attach these headers as resource
171+
attributes with the prefix "kafka.header.", i.e.
172+
139173
```
140-
we will get a log record in collector similar to:
141-
```yaml
142-
{
143-
...
144-
body: Hello,
145-
resource: {
146-
kafka.header.header1: value1,
147-
kafka.header.header2: value2,
148-
},
149-
...
174+
"resource": {
175+
"attributes": {
176+
"kafka.header.header1": "value1",
177+
"kafka.header.header2": "value2",
178+
}
150179
}
180+
...
151181
```
152-
153-
- Here you can see the kafka record header `header1` and `header2` being added to resource attribute.
154-
- Every **matching** kafka header key is prefixed with `kafka.header` string and attached to resource attributes.

receiver/kafkareceiver/config.go

+81-5
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collect
66
import (
77
"go.opentelemetry.io/collector/component"
88
"go.opentelemetry.io/collector/config/configretry"
9+
"go.opentelemetry.io/collector/confmap"
910

1011
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka"
1112
)
@@ -17,22 +18,97 @@ type Config struct {
1718
configkafka.ClientConfig `mapstructure:",squash"`
1819
configkafka.ConsumerConfig `mapstructure:",squash"`
1920

20-
// The name of the kafka topic to consume from (default "otlp_spans" for traces, "otlp_metrics" for metrics, "otlp_logs" for logs)
21+
// Logs holds configuration about how logs should be consumed.
22+
Logs TopicEncodingConfig `mapstructure:"logs"`
23+
24+
// Metrics holds configuration about how metrics should be consumed.
25+
Metrics TopicEncodingConfig `mapstructure:"metrics"`
26+
27+
// Traces holds configuration about how traces should be consumed.
28+
Traces TopicEncodingConfig `mapstructure:"traces"`
29+
30+
// Topic holds the name of the Kafka topic from which to consume data.
31+
//
32+
// Topic has no default. If explicitly specified, it will take precedence
33+
// over the default values of Logs.Topic, Traces.Topic, and Metrics.Topic.
34+
//
35+
// Deprecated [v0.124.0]: Use Logs.Topic, Traces.Topic, and Metrics.Topic.
2136
Topic string `mapstructure:"topic"`
2237

23-
// Encoding of the messages (default "otlp_proto")
38+
// Encoding holds the expected encoding of messages (default "otlp_proto")
39+
//
40+
// Encoding has no default. If explicitly specified, it will take precedence
41+
// over the default values of Logs.Encoding, Traces.Encoding, and
42+
// Metrics.Encoding.
43+
//
44+
// Deprecated [v0.124.0]: Use Logs.Encoding, Traces.Encoding, and
45+
// Metrics.Encoding.
2446
Encoding string `mapstructure:"encoding"`
2547

26-
// Controls the way the messages are marked as consumed
48+
// MessageMarking controls the way the messages are marked as consumed.
2749
MessageMarking MessageMarking `mapstructure:"message_marking"`
2850

29-
// Extract headers from kafka records
51+
// HeaderExtraction controls extraction of headers from Kafka records.
3052
HeaderExtraction HeaderExtraction `mapstructure:"header_extraction"`
3153

32-
// In case of some errors returned by the next consumer, the receiver will wait and retry the failed message
54+
// ErrorBackoff controls backoff/retry behavior when the next consumer
55+
// returns an error.
3356
ErrorBackOff configretry.BackOffConfig `mapstructure:"error_backoff"`
3457
}
3558

59+
func (c *Config) Unmarshal(conf *confmap.Conf) error {
60+
if err := conf.Unmarshal(c); err != nil {
61+
return err
62+
}
63+
// Check if deprecated fields have been explicitly set,
64+
// in which case they should be used instead of signal-
65+
// specific defaults.
66+
var zeroConfig Config
67+
if err := conf.Unmarshal(&zeroConfig); err != nil {
68+
return err
69+
}
70+
if c.Topic != "" {
71+
if zeroConfig.Logs.Topic == "" {
72+
c.Logs.Topic = c.Topic
73+
}
74+
if zeroConfig.Metrics.Topic == "" {
75+
c.Metrics.Topic = c.Topic
76+
}
77+
if zeroConfig.Traces.Topic == "" {
78+
c.Traces.Topic = c.Topic
79+
}
80+
}
81+
if c.Encoding != "" {
82+
if zeroConfig.Logs.Encoding == "" {
83+
c.Logs.Encoding = c.Encoding
84+
}
85+
if zeroConfig.Metrics.Encoding == "" {
86+
c.Metrics.Encoding = c.Encoding
87+
}
88+
if zeroConfig.Traces.Encoding == "" {
89+
c.Traces.Encoding = c.Encoding
90+
}
91+
}
92+
return conf.Unmarshal(c)
93+
}
94+
95+
// TopicEncodingConfig holds signal-specific topic and encoding configuration.
96+
type TopicEncodingConfig struct {
97+
// Topic holds the name of the Kafka topic from which messages of the
98+
// signal type should be consumed.
99+
//
100+
// The default depends on the signal type:
101+
// - "otlp_spans" for traces
102+
// - "otlp_metrics" for metrics
103+
// - "otlp_logs" for logs
104+
Topic string `mapstructure:"topic"`
105+
106+
// Encoding holds the expected encoding of messages for the signal type
107+
//
108+
// Defaults to "otlp_proto".
109+
Encoding string `mapstructure:"encoding"`
110+
}
111+
36112
type MessageMarking struct {
37113
// If true, the messages are marked after the pipeline execution
38114
After bool `mapstructure:"after"`

receiver/kafkareceiver/config_test.go

+71-4
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,65 @@ func TestLoadConfig(t *testing.T) {
4646
config.GroupID = "the_group_id"
4747
return config
4848
}(),
49-
Topic: "spans",
50-
Encoding: "otlp_proto",
49+
Logs: TopicEncodingConfig{
50+
Topic: "spans",
51+
Encoding: "otlp_proto",
52+
},
53+
Metrics: TopicEncodingConfig{
54+
Topic: "spans",
55+
Encoding: "otlp_proto",
56+
},
57+
Traces: TopicEncodingConfig{
58+
Topic: "spans",
59+
Encoding: "otlp_proto",
60+
},
61+
Topic: "spans",
62+
ErrorBackOff: configretry.BackOffConfig{
63+
Enabled: false,
64+
},
65+
},
66+
},
67+
{
68+
id: component.NewIDWithName(metadata.Type, "legacy_topic"),
69+
expected: &Config{
70+
ClientConfig: configkafka.NewDefaultClientConfig(),
71+
ConsumerConfig: configkafka.NewDefaultConsumerConfig(),
72+
Logs: TopicEncodingConfig{
73+
Topic: "legacy_topic",
74+
Encoding: "otlp_proto",
75+
},
76+
Metrics: TopicEncodingConfig{
77+
Topic: "metrics_topic",
78+
Encoding: "otlp_proto",
79+
},
80+
Traces: TopicEncodingConfig{
81+
Topic: "legacy_topic",
82+
Encoding: "otlp_proto",
83+
},
84+
Topic: "legacy_topic",
85+
ErrorBackOff: configretry.BackOffConfig{
86+
Enabled: false,
87+
},
88+
},
89+
},
90+
{
91+
id: component.NewIDWithName(metadata.Type, "legacy_encoding"),
92+
expected: &Config{
93+
ClientConfig: configkafka.NewDefaultClientConfig(),
94+
ConsumerConfig: configkafka.NewDefaultConsumerConfig(),
95+
Logs: TopicEncodingConfig{
96+
Topic: "otlp_logs",
97+
Encoding: "legacy_encoding",
98+
},
99+
Metrics: TopicEncodingConfig{
100+
Topic: "otlp_metrics",
101+
Encoding: "metrics_encoding",
102+
},
103+
Traces: TopicEncodingConfig{
104+
Topic: "otlp_spans",
105+
Encoding: "legacy_encoding",
106+
},
107+
Encoding: "legacy_encoding",
51108
ErrorBackOff: configretry.BackOffConfig{
52109
Enabled: false,
53110
},
@@ -82,8 +139,18 @@ func TestLoadConfig(t *testing.T) {
82139
config.HeartbeatInterval = 15 * time.Second
83140
return config
84141
}(),
85-
Topic: "logs",
86-
Encoding: "direct",
142+
Logs: TopicEncodingConfig{
143+
Topic: "logs",
144+
Encoding: "direct",
145+
},
146+
Metrics: TopicEncodingConfig{
147+
Topic: "otlp_metrics",
148+
Encoding: "otlp_proto",
149+
},
150+
Traces: TopicEncodingConfig{
151+
Topic: "otlp_spans",
152+
Encoding: "otlp_proto",
153+
},
87154
ErrorBackOff: configretry.BackOffConfig{
88155
Enabled: true,
89156
InitialInterval: 1 * time.Second,

0 commit comments

Comments
 (0)