Skip to content

Commit 9ea1a4e

Browse files
BE: Messages: Implement null filtering for CEL (#1050)
Co-authored-by: German Osin <[email protected]>
1 parent b768df6 commit 9ea1a4e

File tree

2 files changed

+42
-2
lines changed

2 files changed

+42
-2
lines changed

api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.fasterxml.jackson.databind.ObjectMapper;
88
import com.google.common.collect.ImmutableCollection;
99
import com.google.common.collect.ImmutableSet;
10+
import com.google.protobuf.NullValue;
1011
import dev.cel.common.CelAbstractSyntaxTree;
1112
import dev.cel.common.CelOptions;
1213
import dev.cel.common.CelValidationException;
@@ -26,6 +27,7 @@
2627
import io.kafbat.ui.exception.CelException;
2728
import io.kafbat.ui.model.TopicMessageDTO;
2829
import java.util.HashMap;
30+
import java.util.LinkedHashMap;
2931
import java.util.Map;
3032
import java.util.Objects;
3133
import java.util.Optional;
@@ -38,11 +40,13 @@
3840
@Slf4j
3941
@UtilityClass
4042
public class MessageFilters {
43+
4144
private static final String CEL_RECORD_VAR_NAME = "record";
4245
private static final String CEL_RECORD_TYPE_NAME = TopicMessageDTO.class.getSimpleName();
4346

4447
private static final CelCompiler CEL_COMPILER = createCompiler();
4548
private static final CelRuntime CEL_RUNTIME = createRuntime();
49+
private static final Object CELL_NULL_VALUE = NullValue.NULL_VALUE;
4650

4751
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
4852

@@ -188,10 +192,34 @@ private static Object parseToJsonOrReturnAsIs(@Nullable String str) {
188192
}
189193

190194
try {
191-
return OBJECT_MAPPER.readValue(str, new TypeReference<Map<String, Object>>() {
192-
});
195+
//@formatter:off
196+
var map = OBJECT_MAPPER.readValue(str, new TypeReference<Map<String, Object>>() {});
197+
//@formatter:on
198+
return replaceCelNulls(map);
193199
} catch (JsonProcessingException e) {
194200
return str;
195201
}
196202
}
203+
204+
@SuppressWarnings("unchecked")
205+
private static Map<String, Object> replaceCelNulls(Map<String, Object> map) {
206+
var result = new LinkedHashMap<String, Object>();
207+
208+
for (var entry : map.entrySet()) {
209+
String key = entry.getKey();
210+
Object value = entry.getValue();
211+
212+
if (value == null) {
213+
result.put(key, CELL_NULL_VALUE);
214+
} else if (value instanceof Map<?, ?>) {
215+
var inner = (Map<String, Object>) value;
216+
result.put(key, replaceCelNulls(inner));
217+
} else {
218+
result.put(key, value);
219+
}
220+
}
221+
222+
return result;
223+
}
224+
197225
}

api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,18 @@ void filterSpeedIsAtLeast5kPerSec() {
201201
assertThat(took).isLessThan(1000);
202202
assertThat(matched).isPositive();
203203
}
204+
205+
@Test
206+
void nullFiltering() {
207+
String msg = "{ \"field\": { \"inner\": null } }";
208+
209+
var f = celScriptFilter("record.value.field.inner == null");
210+
assertTrue(f.test(msg().content(msg)));
211+
212+
f = celScriptFilter("record.value.field.inner != null");
213+
assertFalse(f.test(msg().content(msg)));
214+
}
215+
204216
}
205217

206218
@Test

0 commit comments

Comments
 (0)