Skip to content

Commit 20a5fec

Browse files
authored
Integrate encoding::Encoder with loki sink (#12558)
Signed-off-by: Pablo Sichert <[email protected]>
1 parent aa540a2 commit 20a5fec

File tree

12 files changed

+211
-83
lines changed

12 files changed

+211
-83
lines changed
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
use bytes::BytesMut;
2+
use serde::{Deserialize, Serialize};
3+
use tokio_util::codec::Encoder;
4+
use vector_common::encode_logfmt;
5+
use vector_core::{config::DataType, event::Event, schema};
6+
7+
/// Config used to build a `LogfmtSerializer`.
8+
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
9+
pub struct LogfmtSerializerConfig;
10+
11+
impl LogfmtSerializerConfig {
12+
/// Creates a new `LogfmtSerializerConfig`.
13+
pub const fn new() -> Self {
14+
Self
15+
}
16+
17+
/// Build the `LogfmtSerializer` from this configuration.
18+
pub const fn build(&self) -> LogfmtSerializer {
19+
LogfmtSerializer
20+
}
21+
22+
/// The data type of events that are accepted by `JsonSerializer`.
23+
pub fn input_type(&self) -> DataType {
24+
DataType::Log
25+
}
26+
27+
/// The schema required by the serializer.
28+
pub fn schema_requirement(&self) -> schema::Requirement {
29+
// While technically we support `Value` variants that can't be losslessly serialized to
30+
// logfmt, we don't want to enforce that limitation to users yet.
31+
schema::Requirement::empty()
32+
}
33+
}
34+
35+
/// Serializer that converts an `Event` to bytes using the logfmt format.
36+
#[derive(Debug, Clone)]
37+
pub struct LogfmtSerializer;
38+
39+
impl LogfmtSerializer {
40+
/// Creates a new `LogfmtSerializer`.
41+
pub const fn new() -> Self {
42+
Self
43+
}
44+
}
45+
46+
impl Encoder<Event> for LogfmtSerializer {
47+
type Error = vector_core::Error;
48+
49+
fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
50+
let log = event.as_log();
51+
let string = encode_logfmt::to_string(log.as_map())?;
52+
buffer.extend_from_slice(string.as_bytes());
53+
54+
Ok(())
55+
}
56+
}
57+
58+
#[cfg(test)]
59+
mod tests {
60+
use super::*;
61+
use bytes::BytesMut;
62+
use vector_common::btreemap;
63+
use vector_core::event::Value;
64+
65+
#[test]
66+
fn serialize_logfmt() {
67+
let event = Event::from(btreemap! {
68+
"foo" => Value::from("bar")
69+
});
70+
let mut serializer = LogfmtSerializer::new();
71+
let mut bytes = BytesMut::new();
72+
73+
serializer.encode(event, &mut bytes).unwrap();
74+
75+
assert_eq!(bytes.freeze(), "foo=bar");
76+
}
77+
}

lib/codecs/src/encoding/format/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#![deny(missing_docs)]
55

66
mod json;
7+
mod logfmt;
78
mod native;
89
mod native_json;
910
mod raw_message;
@@ -13,6 +14,7 @@ use std::fmt::Debug;
1314

1415
use dyn_clone::DynClone;
1516
pub use json::{JsonSerializer, JsonSerializerConfig};
17+
pub use logfmt::{LogfmtSerializer, LogfmtSerializerConfig};
1618
pub use native::{NativeSerializer, NativeSerializerConfig};
1719
pub use native_json::{NativeJsonSerializer, NativeJsonSerializerConfig};
1820
pub use raw_message::{RawMessageSerializer, RawMessageSerializerConfig};

lib/codecs/src/encoding/mod.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ use std::fmt::Debug;
88

99
use bytes::BytesMut;
1010
pub use format::{
11-
JsonSerializer, JsonSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig,
12-
NativeSerializer, NativeSerializerConfig, RawMessageSerializer, RawMessageSerializerConfig,
13-
TextSerializer, TextSerializerConfig,
11+
JsonSerializer, JsonSerializerConfig, LogfmtSerializer, LogfmtSerializerConfig,
12+
NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer, NativeSerializerConfig,
13+
RawMessageSerializer, RawMessageSerializerConfig, TextSerializer, TextSerializerConfig,
1414
};
1515
pub use framing::{
1616
BoxedFramer, BoxedFramingError, BytesEncoder, BytesEncoderConfig, CharacterDelimitedEncoder,
@@ -183,6 +183,8 @@ impl tokio_util::codec::Encoder<()> for Framer {
183183
pub enum SerializerConfig {
184184
/// Configures the `JsonSerializer`.
185185
Json,
186+
/// Configures the `LogfmtSerializer`.
187+
Logfmt,
186188
/// Configures the `NativeSerializer`.
187189
Native,
188190
/// Configures the `NativeJsonSerializer`.
@@ -199,6 +201,12 @@ impl From<JsonSerializerConfig> for SerializerConfig {
199201
}
200202
}
201203

204+
impl From<LogfmtSerializerConfig> for SerializerConfig {
205+
fn from(_: LogfmtSerializerConfig) -> Self {
206+
Self::Logfmt
207+
}
208+
}
209+
202210
impl From<NativeSerializerConfig> for SerializerConfig {
203211
fn from(_: NativeSerializerConfig) -> Self {
204212
Self::Native
@@ -228,6 +236,7 @@ impl SerializerConfig {
228236
pub const fn build(&self) -> Serializer {
229237
match self {
230238
SerializerConfig::Json => Serializer::Json(JsonSerializerConfig.build()),
239+
SerializerConfig::Logfmt => Serializer::Logfmt(LogfmtSerializerConfig.build()),
231240
SerializerConfig::Native => Serializer::Native(NativeSerializerConfig.build()),
232241
SerializerConfig::NativeJson => {
233242
Serializer::NativeJson(NativeJsonSerializerConfig.build())
@@ -243,6 +252,7 @@ impl SerializerConfig {
243252
pub fn input_type(&self) -> DataType {
244253
match self {
245254
SerializerConfig::Json => JsonSerializerConfig.input_type(),
255+
SerializerConfig::Logfmt => LogfmtSerializerConfig.input_type(),
246256
SerializerConfig::Native => NativeSerializerConfig.input_type(),
247257
SerializerConfig::NativeJson => NativeJsonSerializerConfig.input_type(),
248258
SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(),
@@ -254,6 +264,7 @@ impl SerializerConfig {
254264
pub fn schema_requirement(&self) -> schema::Requirement {
255265
match self {
256266
SerializerConfig::Json => JsonSerializerConfig.schema_requirement(),
267+
SerializerConfig::Logfmt => LogfmtSerializerConfig.schema_requirement(),
257268
SerializerConfig::Native => NativeSerializerConfig.schema_requirement(),
258269
SerializerConfig::NativeJson => NativeJsonSerializerConfig.schema_requirement(),
259270
SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(),
@@ -267,6 +278,8 @@ impl SerializerConfig {
267278
pub enum Serializer {
268279
/// Uses a `JsonSerializer` for serialization.
269280
Json(JsonSerializer),
281+
/// Uses a `LogfmtSerializer` for serialization.
282+
Logfmt(LogfmtSerializer),
270283
/// Uses a `NativeSerializer` for serialization.
271284
Native(NativeSerializer),
272285
/// Uses a `NativeJsonSerializer` for serialization.
@@ -283,6 +296,12 @@ impl From<JsonSerializer> for Serializer {
283296
}
284297
}
285298

299+
impl From<LogfmtSerializer> for Serializer {
300+
fn from(serializer: LogfmtSerializer) -> Self {
301+
Self::Logfmt(serializer)
302+
}
303+
}
304+
286305
impl From<NativeSerializer> for Serializer {
287306
fn from(serializer: NativeSerializer) -> Self {
288307
Self::Native(serializer)
@@ -313,6 +332,7 @@ impl tokio_util::codec::Encoder<Event> for Serializer {
313332
fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
314333
match self {
315334
Serializer::Json(serializer) => serializer.encode(event, buffer),
335+
Serializer::Logfmt(serializer) => serializer.encode(event, buffer),
316336
Serializer::Native(serializer) => serializer.encode(event, buffer),
317337
Serializer::NativeJson(serializer) => serializer.encode(event, buffer),
318338
Serializer::RawMessage(serializer) => serializer.encode(event, buffer),

lib/codecs/src/lib.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ pub use decoding::{SyslogDeserializer, SyslogDeserializerConfig};
1919
pub use encoding::{
2020
BytesEncoder, BytesEncoderConfig, CharacterDelimitedEncoder, CharacterDelimitedEncoderConfig,
2121
JsonSerializer, JsonSerializerConfig, LengthDelimitedEncoder, LengthDelimitedEncoderConfig,
22-
NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer, NativeSerializerConfig,
23-
NewlineDelimitedEncoder, NewlineDelimitedEncoderConfig, RawMessageSerializer,
24-
RawMessageSerializerConfig, TextSerializer, TextSerializerConfig,
22+
LogfmtSerializer, LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig,
23+
NativeSerializer, NativeSerializerConfig, NewlineDelimitedEncoder,
24+
NewlineDelimitedEncoderConfig, RawMessageSerializer, RawMessageSerializerConfig,
25+
TextSerializer, TextSerializerConfig,
2526
};

src/codecs/encoder.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ impl Encoder<Framer> {
116116
(Serializer::Native(_), _) => "application/octet-stream",
117117
(
118118
Serializer::Json(_)
119+
| Serializer::Logfmt(_)
119120
| Serializer::NativeJson(_)
120121
| Serializer::RawMessage(_)
121122
| Serializer::Text(_),

src/sinks/aws_s3/config.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,13 @@ impl S3SinkConfig {
158158
// TODO: We probably want to use something like octet framing here.
159159
return Err("Native encoding is not implemented for this sink yet".into());
160160
}
161-
(None, Serializer::NativeJson(_) | Serializer::RawMessage(_) | Serializer::Text(_)) => {
162-
NewlineDelimitedEncoder::new().into()
163-
}
161+
(
162+
None,
163+
Serializer::Logfmt(_)
164+
| Serializer::NativeJson(_)
165+
| Serializer::RawMessage(_)
166+
| Serializer::Text(_),
167+
) => NewlineDelimitedEncoder::new().into(),
164168
};
165169
let encoder = Encoder::<Framer>::new(framer, serializer);
166170

src/sinks/azure_blob/config.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,13 @@ impl AzureBlobSinkConfig {
143143
// TODO: We probably want to use something like octet framing here.
144144
return Err("Native encoding is not implemented for this sink yet".into());
145145
}
146-
(None, Serializer::NativeJson(_) | Serializer::RawMessage(_) | Serializer::Text(_)) => {
147-
NewlineDelimitedEncoder::new().into()
148-
}
146+
(
147+
None,
148+
Serializer::Logfmt(_)
149+
| Serializer::NativeJson(_)
150+
| Serializer::RawMessage(_)
151+
| Serializer::Text(_),
152+
) => NewlineDelimitedEncoder::new().into(),
149153
};
150154
let encoder = Encoder::<Framer>::new(framer, serializer);
151155

src/sinks/gcp/cloud_storage.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -286,9 +286,13 @@ impl RequestSettings {
286286
(Some(framer), _) => framer,
287287
(None, Serializer::Json(_)) => CharacterDelimitedEncoder::new(b',').into(),
288288
(None, Serializer::Native(_)) => LengthDelimitedEncoder::new().into(),
289-
(None, Serializer::NativeJson(_) | Serializer::RawMessage(_) | Serializer::Text(_)) => {
290-
NewlineDelimitedEncoder::new().into()
291-
}
289+
(
290+
None,
291+
Serializer::Logfmt(_)
292+
| Serializer::NativeJson(_)
293+
| Serializer::RawMessage(_)
294+
| Serializer::Text(_),
295+
) => NewlineDelimitedEncoder::new().into(),
292296
};
293297
let encoder = Encoder::<Framer>::new(framer, serializer);
294298
let acl = config

src/sinks/loki/config.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
use std::collections::HashMap;
22

3+
use codecs::encoding::SerializerConfig;
4+
use codecs::{JsonSerializerConfig, LogfmtSerializerConfig, TextSerializerConfig};
35
use futures::future::FutureExt;
46
use serde::{Deserialize, Serialize};
57

68
use super::{healthcheck::healthcheck, sink::LokiSink};
9+
use crate::sinks::util::encoding::{EncodingConfigAdapter, EncodingConfigMigrator};
710
use crate::sinks::util::Compression;
811
use crate::{
912
config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
@@ -18,15 +21,28 @@ use crate::{
1821
tls::{TlsConfig, TlsSettings},
1922
};
2023

24+
#[derive(Debug, Clone, Serialize, Deserialize)]
25+
pub struct EncodingMigrator;
26+
27+
impl EncodingConfigMigrator for EncodingMigrator {
28+
type Codec = Encoding;
29+
30+
fn migrate(codec: &Self::Codec) -> SerializerConfig {
31+
match codec {
32+
Encoding::Json => JsonSerializerConfig::new().into(),
33+
Encoding::Text => TextSerializerConfig::new().into(),
34+
Encoding::Logfmt => LogfmtSerializerConfig::new().into(),
35+
}
36+
}
37+
}
38+
2139
#[derive(Clone, Debug, Deserialize, Serialize)]
2240
#[serde(deny_unknown_fields)]
2341
pub struct LokiConfig {
2442
pub endpoint: UriSerde,
25-
pub encoding: EncodingConfig<Encoding>,
26-
43+
pub encoding: EncodingConfigAdapter<EncodingConfig<Encoding>, EncodingMigrator>,
2744
pub tenant_id: Option<Template>,
2845
pub labels: HashMap<Template, Template>,
29-
3046
#[serde(default = "crate::serde::default_false")]
3147
pub remove_label_fields: bool,
3248
#[serde(default = "crate::serde::default_true")]
@@ -35,17 +51,12 @@ pub struct LokiConfig {
3551
pub compression: Compression,
3652
#[serde(default)]
3753
pub out_of_order_action: OutOfOrderAction,
38-
3954
pub auth: Option<Auth>,
40-
4155
#[serde(default)]
4256
pub request: TowerRequestConfig,
43-
4457
#[serde(default)]
4558
pub batch: BatchConfig<LokiDefaultBatchSettings>,
46-
4759
pub tls: Option<TlsConfig>,
48-
4960
#[serde(
5061
default,
5162
deserialize_with = "crate::serde::bool_or_struct",
@@ -85,7 +96,7 @@ impl GenerateConfig for LokiConfig {
8596
fn generate_config() -> toml::Value {
8697
toml::from_str(
8798
r#"endpoint = "http://localhost:3100"
88-
encoding = "json"
99+
encoding.codec = "json"
89100
labels = {}"#,
90101
)
91102
.unwrap()
@@ -132,7 +143,7 @@ impl SinkConfig for LokiConfig {
132143
}
133144

134145
fn input(&self) -> Input {
135-
Input::log()
146+
Input::new(self.encoding.config().input_type())
136147
}
137148

138149
fn sink_type(&self) -> &'static str {

src/sinks/loki/event.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{collections::HashMap, io};
22

3+
use bytes::Bytes;
34
use serde::{ser::SerializeSeq, Serialize};
45
use vector_core::{
56
event::{EventFinalizers, Finalizable},
@@ -52,7 +53,7 @@ impl From<Vec<LokiRecord>> for LokiBatch {
5253
#[derive(Clone, Debug)]
5354
pub struct LokiEvent {
5455
pub timestamp: i64,
55-
pub event: String,
56+
pub event: Bytes,
5657
}
5758

5859
impl ByteSizeOf for LokiEvent {
@@ -68,7 +69,8 @@ impl Serialize for LokiEvent {
6869
{
6970
let mut seq = serializer.serialize_seq(Some(2))?;
7071
seq.serialize_element(&self.timestamp.to_string())?;
71-
seq.serialize_element(&self.event)?;
72+
let event = String::from_utf8_lossy(&self.event);
73+
seq.serialize_element(&event)?;
7274
seq.end()
7375
}
7476
}

0 commit comments

Comments
 (0)