Skip to content

Commit bd5ac98

Browse files
authored
DSL and Pulsar docs (#34)
1 parent 11b23c2 commit bd5ac98

File tree

18 files changed

+455
-18
lines changed

18 files changed

+455
-18
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Replace me with the real documentation.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Replace me with the real documentation.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Replace me with the real documentation.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Replace me with the real documentation.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Replace me with the real documentation.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Replace me with the real documentation.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Replace me with the real documentation.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Replace me with the real documentation.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Replace me with the real documentation.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Replace me with the real documentation.

docs/backend/dsl.md

Lines changed: 261 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,261 @@
1-
### Coming soon!
1+
# Bullet DSL
2+
3+
Bullet DSL is a configuration-based DSL that allows users to plug their data into the Bullet Backend. Instead of having users write their own code to set up the Backend on their data, users can now accomplish the same thing by simply providing the appropriate configuration to Bullet.
4+
5+
To support this, Bullet DSL provides two major components. The first is for reading data from a pluggable data source (the *connectors* for talking to various data sources), and the second is for converting data (the *converters* for understanding your data formats) into [BulletRecords](ingestion.md).
6+
By enabling Bullet DSL in the Backend and configuring Bullet DSL, your backend will use the two components to read from the configured data source and convert the data into BulletRecords, without you having to write any code.
7+
8+
The three interfaces that the DSL uses are:
9+
10+
1. The **BulletConnector** : Bullet DSL's reading component
11+
2. The **BulletRecordConverter** : Bullet DSL's converting component
12+
3. The **Bullet Backend** : The implementation of Bullet on a Stream Processor
13+
14+
There is also an optional BulletDeserializer component that sits between the Connector and the Converter to deserialize the data.
15+
16+
!!!note
17+
18+
For the Backend, please refer to the DSL-specific Bullet Storm setup [here](storm-setup.md#using-bullet-dsl). (Currently, only Bullet Storm supports Bullet DSL.)
19+
20+
## BulletConnector
21+
22+
BulletConnector is an abstract Java class that can be implemented to read data from different pluggable data sources. As with all our components, we provide and maintain implementations while providing an interface to add new ones. Currently, we support two BulletConnector implementations:
23+
24+
1. [KafkaConnector](https://github.com/bullet-db/bullet-dsl/blob/master/src/main/java/com/yahoo/bullet/dsl/connector/KafkaConnector.java) for connecting to [Apache Kafka](https://kafka.apache.org/)
25+
2. [PulsarConnector](https://github.com/bullet-db/bullet-dsl/blob/master/src/main/java/com/yahoo/bullet/dsl/connector/PulsarConnector.java) for connecting to [Apache Pulsar](https://pulsar.apache.org/)
26+
27+
When using Bullet DSL, you will need to specify the particular BulletConnector to use. For example, if you wanted to use the KafkaConnector, you would add the following to your configuration file:
28+
29+
```yaml
30+
# The classpath to the BulletConnector to use (need this for Bullet DSL!)
31+
bullet.dsl.connector.class.name: "com.yahoo.bullet.dsl.connector.KafkaConnector"
32+
33+
# The read timeout duration in ms (defaults to 0)
34+
bullet.dsl.connector.read.timeout.ms: 0
35+
36+
# Whether or not to asynchronously commit messages (defaults to true)
37+
bullet.dsl.connector.async.commit.enable: true
38+
```
39+
40+
All BulletConnector configuration can be found in the [default configuration file](https://github.com/bullet-db/bullet-dsl/blob/master/src/main/resources/bullet_dsl_defaults.yaml) along with specific default configuration for both implementations.
41+
42+
!!!note
43+
44+
If you have an unsupported data source and you want to use Bullet DSL, you will have to implement your own BulletConnector. If you do, please do consider contributing it back! Check out the
45+
BulletConnector interface [here](https://github.com/bullet-db/bullet-dsl/blob/master/src/main/java/com/yahoo/bullet/dsl/connector/BulletConnector.java).
46+
47+
48+
### KafkaConnector
49+
50+
The KafkaConnector configuration requires a few settings that are necessary to read from Kafka, including the bootstrap servers, group id, and key/value deserializers, and the topics to subscribe to.
51+
52+
```yaml
53+
# The list of Kafka topics to subscribe to (required)
54+
bullet.dsl.connector.kafka.topics:
55+
- ""
56+
57+
# Whether or not the KafkaConsumer should seek to the end of its subscribed topics at initialization (defaults to false)
58+
bullet.dsl.connector.kafka.start.at.end.enable: false
59+
60+
# Required consumer properties
61+
bullet.dsl.connector.kafka.bootstrap.servers: "localhost:9092"
62+
bullet.dsl.connector.kafka.group.id:
63+
bullet.dsl.connector.kafka.key.deserializer: "org.apache.kafka.common.serialization.StringDeserializer"
64+
bullet.dsl.connector.kafka.value.deserializer:
65+
```
66+
67+
You can also pass additional Kafka properties to the KafkaConnector by prefixing them with ```bullet.dsl.connector.kafka.``` For a complete list of properties, see the [Kafka Consumer configs](https://kafka.apache.org/0102/documentation.html#newconsumerconfigs).
68+
69+
### PulsarConnector
70+
71+
The PulsarConnector configuration requires a few settings that are necessary to read from Pulsar. It also provides additional options to enable authentication and/or TLS.
72+
73+
```yaml
74+
# The list of Pulsar topics to subscribe to (required)
75+
bullet.dsl.connector.pulsar.topics:
76+
- ""
77+
78+
# The Pulsar Schema to use (required)
79+
bullet.dsl.connector.pulsar.schema.type: "BYTES"
80+
81+
# The classpath to the Pulsar Schema to use (required only if using JSON, AVRO, PROTOBUF, or CUSTOM schema)
82+
bullet.dsl.connector.pulsar.schema.class.name:
83+
84+
# Required client property
85+
bullet.dsl.connector.pulsar.client.serviceUrl: "pulsar://localhost:6650"
86+
87+
# Authentication properties (disabled by default)
88+
bullet.dsl.connector.pulsar.auth.enable: false
89+
bullet.dsl.connector.pulsar.auth.plugin.class.name:
90+
bullet.dsl.connector.pulsar.auth.plugin.params.string:
91+
92+
# Required consumer properties
93+
bullet.dsl.connector.pulsar.consumer.subscriptionName: ""
94+
bullet.dsl.connector.pulsar.consumer.subscriptionType: "Shared"
95+
```
96+
97+
Most important to note is that the connector requires a [Pulsar schema](https://pulsar.apache.org/docs/en/concepts-schema-registry/) whose type can be either BYTES, STRING, JSON, AVRO, PROTOBUF, or CUSTOM (defaults to BYTES). If the schema is any type except CUSTOM, the connector will load the schema natively supported by Pulsar. For JSON, AVRO, and PROTOBUF, the POJO class to wrap must be specified. For a CUSTOM schema, the schema class must be specified instead.
98+
99+
You can also pass additional Pulsar Client and Consumer properties to the PulsarConnector by prefixing them with ```bullet.dsl.connector.pulsar.client``` and ```bullet.dsl.connector.pulsar.consumer``` For both lists of properties, see Pulsar [ClientConfigurationData](https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java) and [ConsumerConfigurationData](https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java).
100+
101+
## BulletRecordConverter
102+
103+
BulletRecordConverter is an abstract Java class that can be implemented to convert different types of data formats into BulletRecords.
104+
105+
Currently, we support three BulletRecordConverter implementations:
106+
107+
1. [POJOBulletRecordConverter](https://github.com/bullet-db/bullet-dsl/blob/master/src/main/java/com/yahoo/bullet/dsl/converter/POJOBulletRecordConverter.java) to convert [POJOs](https://en.wikipedia.org/wiki/Plain_old_Java_object)
108+
2. [MapBulletRecordConverter](https://github.com/bullet-db/bullet-dsl/blob/master/src/main/java/com/yahoo/bullet/dsl/converter/MapBulletRecordConverter.java) for [Java Maps](https://docs.oracle.com/javase/8/docs/api/java/util/Map.html) of Objects
109+
3. [AvroBulletRecordConverter](https://github.com/bullet-db/bullet-dsl/blob/master/src/main/java/com/yahoo/bullet/dsl/converter/AvroBulletRecordConverter.java) for [Apache Avro](https://avro.apache.org/)
110+
111+
When using Bullet DSL, you will need to specify the appropriate BulletRecordConverter to use. The converters also support taking in an optional schema (see the [Schema section][#schema] for more details and the benefits to using one). For example, to use AvroBulletRecordConverter, you would add the following to your configuration file:
112+
113+
```yaml
114+
# The classpath to the BulletRecordConverter to use
115+
bullet.dsl.converter.class.name: "com.yahoo.bullet.dsl.converter.AvroBulletRecordConverter"
116+
117+
# The path to the schema file to use
118+
bullet.dsl.converter.schema.file: "your-schema-file.json"
119+
```
120+
121+
!!!note "Supported Converters"
122+
123+
At this moment, these are the converters that we maintain. If you do need a specific converter though that is not yet available, you can write your own (and hopefully contribute it back!). Check out the BulletRecordConverter
124+
interface [here](https://github.com/bullet-db/bullet-dsl/blob/master/src/main/java/com/yahoo/bullet/dsl/converter/BulletRecordConverter.java).
125+
126+
### POJOBulletRecordConverter
127+
128+
The POJOBulletRecordConverter uses Java Reflection to convert POJOs into BulletRecords. In the configuration, you need to specify the POJO class you want to convert, and when the converter is created, it will inspect the POJO with Reflection. Without a [schema](#schema), the converter will look through all fields and accept only the fields that have valid types. With a schema, the converter will only look for the fields referenced, but it will also accept getter methods. It is recommended to specify getters as references where possible.
129+
130+
```yaml
131+
bullet.dsl.converter.pojo.class.name: "com.your.package.YourPOJO"
132+
```
133+
134+
### MapBulletRecordConverter
135+
136+
The MapBulletRecordConverter is used to convert Java Maps of Objects into BulletRecords. Without a schema, it simply inserts every entry in the Map into a BulletRecord without any type-checking. If the Map contains objects that are not types supported by the BulletRecord, you might have issues when serializing the record.
137+
138+
### AvroBulletRecordConverter
139+
140+
The AvroBulletRecordConverter is used to convert Avro records into BulletRecords. Without a schema, it inserts every field into a BulletRecord without any type-checking. With a schema, you get type-checking, and you can also specify a RECORD field, and the converter will accept Avro Records in addition to Maps, flattening them into the BulletRecord.
141+
142+
### Schema
143+
144+
The schema consists of a list of fields each described by a name, reference, type, and subtype.
145+
146+
1. `name` : The name of the field in the BulletRecord
147+
2. `reference` : The field to extract from the to-be-converted object
148+
3. `type` : The type of the field
149+
4. `subtype` : The subtype of any nested fields in this field (if any)
150+
151+
152+
When using the schema:
153+
154+
1. The `name` of the field in the schema will be the name of the field in the BulletRecord.
155+
2. The `reference` of the field in the schema is the field/value to be extracted from an object when it is converted to a BulletRecord.
156+
3. If the `reference` is null, it is assumed that the `name` and the `reference` are the same.
157+
4. The `type` must be specified and will be used for type-checking.
158+
5. The `subtype` must be specified for certain `type` values (`LIST`, `LISTOFMAP`, `MAP`, or `MAPOFMAP`). Otherwise, it must be null.
159+
160+
#### Types
161+
162+
1. BOOLEAN
163+
2. INTEGER
164+
3. LONG
165+
4. FLOAT
166+
5. DOUBLE
167+
6. STRING
168+
7. LIST
169+
8. LISTOFMAP
170+
9. MAP
171+
10. MAPOFMAP
172+
11. RECORD
173+
174+
#### Subtypes
175+
176+
1. BOOLEAN
177+
2. INTEGER
178+
3. LONG
179+
4. FLOAT
180+
5. DOUBLE
181+
6. STRING
182+
183+
!!!note "RECORD"
184+
185+
For RECORD type, you should normally reference a Map. For each key-value pair in the Map, a field will be inserted into the BulletRecord. Hence, the name in a RECORD field is left empty.
186+
187+
#### Example Schema
188+
189+
```json
190+
{
191+
"fields": [
192+
{
193+
"name": "myBool",
194+
"type": "BOOLEAN"
195+
},
196+
{
197+
"name": "myBoolMap",
198+
"type": "MAP",
199+
"subtype": "BOOLEAN"
200+
},
201+
{
202+
"name": "myLongMapMap",
203+
"type": "MAPOFMAP",
204+
"subtype": "LONG"
205+
},
206+
{
207+
"name": "myIntFromSomeMap",
208+
"reference": "someMap.myInt",
209+
"type": "INTEGER"
210+
},
211+
{
212+
"name": "myIntFromSomeIntList",
213+
"reference": "someIntList.0",
214+
"type": "INTEGER"
215+
},
216+
{
217+
"name": "myIntFromSomeNestedMapsAndLists",
218+
"reference": "someMap.nestedMap.nestedList.0",
219+
"type": "INTEGER"
220+
},
221+
{
222+
"reference" : "someMap",
223+
"type": "RECORD"
224+
}
225+
]
226+
}
227+
```
228+
229+
## BulletDeserializer
230+
231+
BulletDeserializer is an abstract Java class that can be implemented to deserialize/transform output from BulletConnector to input for BulletRecordConverter. It is an *optional* component and whether it's necessary or not depends on the output of your data sources. For example, if your KafkaConnector outputs byte arrays that are actually Java-serialized Maps, and you're using a MapBulletRecordConverter, you would use the JavaDeserializer, which would deserialize byte arrays into Java Maps for the converter.
232+
233+
Currently, we support two BulletDeserializer implementations:
234+
235+
1. [JavaDeserializer](https://github.com/bullet-db/bullet-dsl/blob/master/src/main/java/com/yahoo/bullet/dsl/deserializer/JavaDeserializer.java)
236+
2. [AvroDeserializer](https://github.com/bullet-db/bullet-dsl/blob/master/src/main/java/com/yahoo/bullet/dsl/deserializer/AvroDeserializer.java)
237+
238+
For Bullet DSL, if you wanted to use AvroDeserializer, you would add the following to your configuration file:
239+
240+
```yaml
241+
# The classpath to the BulletDeserializer to use
242+
bullet.dsl.deserializer.class.name: "com.yahoo.bullet.dsl.deserializer.AvroDeserializer"
243+
```
244+
245+
### JavaDeserializer
246+
247+
The JavaDeserializer uses Java Serialization to deserialize (Java-serialized) byte arrays into objects.
248+
249+
### AvroDeserializer
250+
251+
The AvroDeserializer uses Avro to deserialize (Avro-serialized) byte arrays into Avro GenericRecords.
252+
253+
The deserializer must be given the Avro schema for the Avro records you want to deserialize. In the configuration, you can either provide the Avro schema file (note the `file://` prefix) or the Avro class itself (the class must be in the classpath).
254+
255+
```yaml
256+
# The path to the Avro schema file to use prefixed by "file://"
257+
bullet.dsl.deserializer.avro.schema.file: "file://example.avsc"
258+
259+
# The class name of the Avro record class to deserialize
260+
bullet.dsl.deserializer.avro.class.name: "com.your.package.YourAvro"
261+
```

docs/backend/storm-setup.md

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,19 +106,75 @@ If you have implemented your own main class (option 2 above), you just pass your
106106
storm jar your-fat-jar-with-dependencies.jar \
107107
com.yahoo.bullet.Topology \
108108
--bullet-conf path/to/the/bullet_settings.yaml \
109-
--bullet-spout full.package.prefix.to.your.spout.implementation \
110-
--bullet-spout-parallelism 64 \
111-
--bullet-spout-cpu-load 200.0 \
112-
--bullet-spout-on-heap-memory-load 512.0 \
113-
--bullet-spout-off-heap-memory-load 256.0 \
114-
--bullet-spout-arg arg-to-your-spout-class-for-ex-a-path-to-a-config-file \
115-
--bullet-spout-arg another-arg-to-your-spout-class \
116109
-c topology.acker.executors=64 \
117110
-c topology.max.spout.pending=10000
118111
```
119112

113+
And in your bullet_settings.yaml, you would have, for example:
114+
115+
```yaml
116+
bullet.topology.bullet.spout.class.name: "full.package.prefix.to.your.spout.implementation"
117+
bullet.topology.bullet.spout.args: ["arg-to-your-spout-class-for-example-a-path-to-a-config-file", "another-arg-to-your-spout-class"]
118+
bullet.topology.bullet.spout.parallelism: 64
119+
bullet.topology.bullet.spout.cpu.load: 200.0
120+
bullet.topology.bullet.spout.memory.on.heap.load: 512.0
121+
bullet.topology.bullet.spout.memory.off.heap.load: 256.0
122+
```
123+
120124
You can pass other arguments to Storm using the -c argument. The example above uses 64 ackers, for example and uses Storm's [reliable message processing mechanisms](http://storm.apache.org/releases/1.1.0/Guaranteeing-message-processing.html). Certain components in the Bullet Storm topology cannot be reliable due to how Bullet operates currently. Hundreds of millions of Storm tuples could go into any query running in Bullet and it is intractable to *anchor* a single Bullet aggregation to those tuples, particularly when the results are approximate. However, you should enable acking to ensure at least once message deliveries for the hop from your topology (or spout) to the Filter bolts and for the Query spouts to the Filter and Join bolts. Ackers are lightweight so you need not have the same number of tasks as components that ack in your topology so you can tweak it accordingly. The example above also sets max spout pending to control how fast the spout emits. You could use the back-pressure mechanisms in Storm in addition or in lieu of as you choose. We have found that max spout pending gives a much more predictable way of throttling our spouts during catch up or data spikes.
121125
122126
!!! note "Main Class Arguments"
123127
124128
If you run the main class without arguments or pass in the ```--help``` argument, you can see what these arguments mean and what others are supported.
129+
130+
## Using Bullet DSL
131+
132+
Instead of implementing your own spout or Topology, you can also use the provided DSL spout (and optionally, DSL Bolt) with [Bullet DSL](dsl.md). To do so, add the following settings to your YAML configuration:
133+
134+
```yaml
135+
bullet.topology.dsl.spout.enable: true
136+
bullet.topology.dsl.spout.parallelism:
137+
bullet.topology.dsl.spout.cpu.load:
138+
bullet.topology.dsl.spout.memory.on.heap.load:
139+
bullet.topology.dsl.spout.memory.off.heap.load:
140+
141+
bullet.topology.dsl.bolt.enable: false
142+
bullet.topology.dsl.bolt.parallelism:
143+
bullet.topology.dsl.bolt.cpu.load:
144+
bullet.topology.dsl.bolt.memory.on.heap.load:
145+
bullet.topology.dsl.bolt.memory.off.heap.load:
146+
147+
bullet.topology.dsl.deserializer.enable: false
148+
```
149+
150+
If the DSL Bolt is enabled in addition to the spout (the spout is always required!), Storm will read your data in the spout and convert it in the bolt. Without the bolt, reading and converting are done entirely in the spout. If you wish to separate the two by enabling the DSL Bolt, you can lower per-worker latencies when data volume is large and scale them independently.
151+
152+
There is also a setting to enable [BulletDeserializer](dsl.md#bulletdeserializer), which is an optional component of Bullet DSL for deserializing data between reading and converting.
153+
154+
#### Setup
155+
156+
The Bullet Storm jar is not built with Bullet DSL or with other dependencies you may want such as Kafka and Pulsar. Instead, you will have to either add the dependencies (the DSL fat jar and your particular connector dependencies) to the Storm launcher and worker environments or build a fat jar with the dependencies. In Storm 1.2.2+, however, you also have the option of directly adding the following jars to the classpath in the `storm jar` command.
157+
158+
##### Kafka
159+
160+
[Kafka Clients 2.1.0](https://bintray.com/bintray/jcenter/org.apache.kafka%3Akafka-clients)
161+
162+
##### Pulsar
163+
164+
[Pulsar Client 2.2.1](https://bintray.com/bintray/jcenter/org.apache.pulsar%3Apulsar-client)
165+
166+
[Pulsar Client Schema 2.2.1](https://bintray.com/bintray/jcenter/org.apache.pulsar%3Apulsar-client-schema)
167+
168+
[Pulsar Protobuf Shaded 2.1.0-incubating](https://bintray.com/bintray/jcenter/org.apache.pulsar%3Aprotobuf-shaded)
169+
170+
##### Example
171+
172+
The following is an example for Pulsar in Storm 1.2.2+:
173+
174+
```
175+
176+
storm jar bullet-storm-0.9.1.jar \
177+
com.yahoo.bullet.storm.Topology \
178+
--bullet-conf ./bullet_settings.yaml \
179+
--jars "bullet-dsl-0.1.2.jar,pulsar-client-2.2.1.jar,pulsar-client-schema-2.2.1.jar,protobuf-shaded-2.1.0-incubating.jar"
180+
```

0 commit comments

Comments
 (0)