You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
value="The version of the stroom-stats service" />
941
941
<propertyname="editable"value="true"/>
@@ -1039,7 +1039,7 @@
1039
1039
<propertyname="value"
1040
1040
value="localhost:2181"/>
1041
1041
<propertyname="description"
1042
-
value="The Zookeeper quorum connection string, required for service discovery" />
1042
+
value="The Zookeeper quorum connection string, required for service discovery, in the form 'host1:port1,host2:port2,host3:port3'. The root znode to use in Zookeeper is defined in the property stroom.serviceDiscovery.zookeeperBasePath" />
Copy file name to clipboardExpand all lines: stroom-pipeline/src/main/java/stroom/pipeline/server/writer/RollingKafkaAppender.java
+8-2
Original file line number
Diff line number
Diff line change
@@ -13,7 +13,6 @@
13
13
importstroom.util.spring.StroomScope;
14
14
15
15
importjavax.annotation.Resource;
16
-
importjavax.inject.Inject;
17
16
importjava.io.IOException;
18
17
19
18
@Component
@@ -31,6 +30,7 @@ public class RollingKafkaAppender extends AbstractRollingAppender {
31
30
32
31
privateStringtopic;
33
32
privateStringrecordKey;
33
+
privateStroomKafkaProducer.FlushModeflushMode;
34
34
35
35
privateStringkey;
36
36
@@ -62,7 +62,8 @@ public RollingDestination createDestination() throws IOException {
62
62
System.currentTimeMillis(),
63
63
stroomKafkaProducer,
64
64
recordKey,
65
-
topic);
65
+
topic,
66
+
flushMode);
66
67
}
67
68
68
69
@PipelineProperty(description = "The record key to apply to records, used to select patition. Replacement variables can be used in path strings such as ${feed}.")
@@ -74,4 +75,9 @@ public void setRecordKey(final String recordKey) {
74
75
publicvoidsetTopic(finalStringtopic) {
75
76
this.topic = pathCreator.replaceAll(topic);
76
77
}
78
+
79
+
@PipelineProperty(description="Flush the producer each time a message is sent")
0 commit comments