Skip to content

Commit b776439

Browse files
bors[bot]fhennig
andauthored
Merge #102
102: Discovery r=fhennig a=fhennig ## Description - Added discovery to druid: The SQL connect string is written to a config map - Fixed the role services - they were referencing a port by name that doesn't have that name anymore Co-authored-by: Felix Hennig <[email protected]> Co-authored-by: Felix Hennig <[email protected]>
2 parents 587cac1 + 3eae735 commit b776439

File tree

9 files changed

+241
-14
lines changed

9 files changed

+241
-14
lines changed

CHANGELOG.md

+6-1
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,18 @@ All notable changes to this project will be documented in this file.
66

77
### Changed
88

9+
- Fixed a port reference in the role services ([#102])
910
- Shut down gracefully ([#101]).
1011

12+
### Added
13+
14+
- Added the discovery ConfigMap creation ([#102])
15+
1116
[#101]: https://github.com/stackabletech/druid-operator/pull/101
17+
[#102]: https://github.com/stackabletech/druid-operator/pull/102
1218

1319
## [0.2.0] - 2021-12-23
1420

15-
1621
### Changed
1722

1823
- Migrated to StatefulSet rather than direct Pod management ([#59]).

docs/modules/ROOT/pages/usage.adoc

+11-3
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ Then a cluster can be deployed using the example below. Make sure you have *exac
3737
apiVersion: druid.stackable.tech/v1alpha1
3838
kind: DruidCluster
3939
metadata:
40-
name: simple
40+
name: simple-druid
4141
spec:
4242
version: 0.22.0
4343
zookeeperReference: simple-zk
@@ -79,7 +79,7 @@ spec:
7979
config: {}
8080
replicas: 1
8181

82-
The Router is hosting the web UI, a `NodePort` service is created by the operator to access the web UI. Connect to the `simple-router` `NodePort` service and follow the https://druid.apache.org/docs/latest/tutorials/index.html#step-4-load-data[druid documentation] on how to load and query sample data.
82+
The Router is hosting the web UI, a `NodePort` service is created by the operator to access the web UI. Connect to the `simple-druid-router` `NodePort` service and follow the https://druid.apache.org/docs/latest/tutorials/index.html#step-4-load-data[druid documentation] on how to load and query sample data.
8383

8484
== Using S3
8585

@@ -106,4 +106,12 @@ This allows to ingest data from accessible buckets already. To configure a bucke
106106
deepStorage:
107107
storageType: s3
108108
bucket: druid-deepstorage
109-
baseKey: storage # the base key is the prefix to be used; optional
109+
baseKey: storage # the base key is the prefix to be used; optional
110+
111+
== Connecting to Druid from other Services
112+
113+
The operator creates a `ConfigMap` with the name of the cluster which contains connection information. Following our example above (the name of the cluster is `simple-druid`) a `ConfigMap` with the name `simple-druid` will be created containing 3 keys:
114+
115+
- `DRUID_ROUTER` with the format `<host>:<port>`, which points to the router processes HTTP endpoint. Here you can connect to the web UI, or use REST endpoints such as `/druid/v2/sql/` to query data. https://druid.apache.org/docs/latest/querying/sql.html#http-post[More information in the Druid Docs].
116+
- `DRUID_AVATICA_JDBC` contains a JDBC connect string which can be used together with the https://calcite.apache.org/avatica/downloads/[Avatica JDBC Driver] to connect to Druid and query data. https://druid.apache.org/docs/latest/querying/sql.html#jdbc[More information in the Druid Docs].
117+
- `DRUID_SQALCHEMY` contains a connection string used to connect to Druid with SQAlchemy, in - for example - Apache Superset.

examples/derby/druidcluster.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
apiVersion: druid.stackable.tech/v1alpha1
22
kind: DruidCluster
33
metadata:
4-
name: simple
4+
name: derby-druid
55
spec:
66
version: 0.22.0
77
zookeeperReference:

examples/psql-s3/druidcluster.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
apiVersion: druid.stackable.tech/v1alpha1
22
kind: DruidCluster
33
metadata:
4-
name: simple
4+
name: psqls3-druid
55
spec:
66
version: 0.22.0
77
zookeeperReference:

examples/psql/druidcluster.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
apiVersion: druid.stackable.tech/v1alpha1
22
kind: DruidCluster
33
metadata:
4-
name: simple
4+
name: psql-druid
55
spec:
66
version: 0.22.0
77
zookeeperReference:

rust/crd/src/lib.rs

+116-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use serde::{Deserialize, Serialize};
2-
use stackable_operator::kube::CustomResource;
3-
use stackable_operator::product_config_utils::{ConfigError, Configuration};
4-
use stackable_operator::role_utils::Role;
5-
use stackable_operator::schemars::{self, JsonSchema};
2+
use stackable_operator::{
3+
kube::CustomResource,
4+
product_config_utils::{ConfigError, Configuration},
5+
role_utils::Role,
6+
schemars::{self, JsonSchema},
7+
};
68
use std::collections::BTreeMap;
79
use std::str::FromStr;
810
use strum_macros::Display;
@@ -145,6 +147,7 @@ impl DruidRole {
145147
}
146148

147149
impl DruidCluster {
150+
/// The spec for the given Role
148151
pub fn get_role(&self, role: &DruidRole) -> &Role<DruidConfig> {
149152
match role {
150153
DruidRole::Coordinator => &self.spec.coordinators,
@@ -154,6 +157,20 @@ impl DruidCluster {
154157
DruidRole::Router => &self.spec.routers,
155158
}
156159
}
160+
161+
/// The name of the role-level load-balanced Kubernetes `Service`
162+
pub fn role_service_name(&self, role: &DruidRole) -> Option<String> {
163+
Some(format!("{}-{}", self.metadata.name.clone()?, role))
164+
}
165+
166+
/// The fully-qualified domain name of the role-level load-balanced Kubernetes `Service`
167+
pub fn role_service_fqdn(&self, role: &DruidRole) -> Option<String> {
168+
Some(format!(
169+
"{}.{}.svc.cluster.local",
170+
self.role_service_name(role)?,
171+
self.metadata.namespace.as_ref()?
172+
))
173+
}
157174
}
158175

159176
#[derive(Clone, Debug, Default, Deserialize, JsonSchema, Serialize)]
@@ -361,3 +378,98 @@ fn build_string_list(strings: &[String]) -> String {
361378
let comma_list = quoted_strings.join(", ");
362379
format!("[{}]", comma_list)
363380
}
381+
382+
#[cfg(test)]
383+
mod tests {
384+
use super::*;
385+
use stackable_operator::role_utils::CommonConfiguration;
386+
use stackable_operator::role_utils::RoleGroup;
387+
use std::array::IntoIter;
388+
use std::collections::HashMap;
389+
390+
#[test]
391+
fn test_service_name_generation() {
392+
let mut cluster = DruidCluster::new(
393+
"testcluster",
394+
DruidClusterSpec {
395+
stopped: None,
396+
version: "".to_string(),
397+
brokers: Role {
398+
config: CommonConfiguration {
399+
config: DruidConfig {},
400+
config_overrides: Default::default(),
401+
env_overrides: Default::default(),
402+
cli_overrides: Default::default(),
403+
},
404+
role_groups: Default::default(),
405+
},
406+
coordinators: Role {
407+
config: CommonConfiguration {
408+
config: DruidConfig {},
409+
config_overrides: Default::default(),
410+
env_overrides: Default::default(),
411+
cli_overrides: Default::default(),
412+
},
413+
role_groups: Default::default(),
414+
},
415+
historicals: Role {
416+
config: CommonConfiguration {
417+
config: DruidConfig {},
418+
config_overrides: Default::default(),
419+
env_overrides: Default::default(),
420+
cli_overrides: Default::default(),
421+
},
422+
role_groups: Default::default(),
423+
},
424+
middle_managers: Role {
425+
config: CommonConfiguration {
426+
config: DruidConfig {},
427+
config_overrides: Default::default(),
428+
env_overrides: Default::default(),
429+
cli_overrides: Default::default(),
430+
},
431+
role_groups: Default::default(),
432+
},
433+
routers: Role {
434+
config: CommonConfiguration {
435+
config: DruidConfig {},
436+
config_overrides: Default::default(),
437+
env_overrides: Default::default(),
438+
cli_overrides: Default::default(),
439+
},
440+
role_groups: HashMap::<_, _>::from_iter(IntoIter::new([(
441+
"default".to_string(),
442+
RoleGroup {
443+
config: CommonConfiguration {
444+
config: DruidConfig {},
445+
config_overrides: Default::default(),
446+
env_overrides: Default::default(),
447+
cli_overrides: Default::default(),
448+
},
449+
replicas: Some(1),
450+
selector: None,
451+
},
452+
)])),
453+
},
454+
metadata_storage_database: Default::default(),
455+
deep_storage: Default::default(),
456+
s3: None,
457+
zookeeper_reference: Default::default(),
458+
},
459+
);
460+
461+
cluster.metadata.namespace = Some("default".to_string());
462+
463+
assert_eq!(cluster.metadata.name, Some("testcluster".to_string()));
464+
465+
assert_eq!(
466+
cluster.role_service_name(&DruidRole::Router),
467+
Some("testcluster-router".to_string())
468+
);
469+
470+
assert_eq!(
471+
cluster.role_service_fqdn(&DruidRole::Router),
472+
Some("testcluster-router.default.svc.cluster.local".to_string())
473+
)
474+
}
475+
}

rust/operator-binary/src/discovery.rs

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
//! Discovery for Druid. We make Druid discoverable by putting a connection string to the router service
2+
//! inside a config map. We only provide a connection string to the router service, since it serves as
3+
//! a gateway to the cluster for client queries.
4+
5+
use snafu::{OptionExt, ResultExt, Snafu};
6+
use stackable_druid_crd::{DruidCluster, DruidRole, APP_NAME};
7+
use stackable_operator::{
8+
builder::{ConfigMapBuilder, ObjectMetaBuilder},
9+
k8s_openapi::api::core::v1::ConfigMap,
10+
kube::{runtime::reflector::ObjectRef, Resource, ResourceExt},
11+
};
12+
13+
use crate::druid_controller::druid_version;
14+
15+
#[derive(Snafu, Debug)]
16+
pub enum Error {
17+
#[snafu(display("object {} is missing metadata to build owner reference", druid))]
18+
ObjectMissingMetadataForOwnerRef {
19+
source: stackable_operator::error::Error,
20+
druid: ObjectRef<DruidCluster>,
21+
},
22+
#[snafu(display("failed to get service FQDN"))]
23+
NoServiceFqdn,
24+
#[snafu(display("failed to build ConfigMap"))]
25+
BuildConfigMap {
26+
source: stackable_operator::error::Error,
27+
},
28+
}
29+
30+
/// Builds discovery [`ConfigMap`]s for connecting to a [`DruidCluster`]
31+
pub async fn build_discovery_configmaps(
32+
owner: &impl Resource<DynamicType = ()>,
33+
druid: &DruidCluster,
34+
) -> Result<Vec<ConfigMap>, Error> {
35+
let name = owner.name();
36+
Ok(vec![build_discovery_configmap(&name, owner, druid)?])
37+
}
38+
39+
/// Build a discovery [`ConfigMap`] containing information about how to connect to a certain [`DruidCluster`]
40+
fn build_discovery_configmap(
41+
name: &str,
42+
owner: &impl Resource<DynamicType = ()>,
43+
druid: &DruidCluster,
44+
) -> Result<ConfigMap, Error> {
45+
let router_host = format!(
46+
"{}:{}",
47+
druid
48+
.role_service_fqdn(&DruidRole::Router)
49+
.with_context(|| NoServiceFqdn)?,
50+
DruidRole::Router.get_http_port()
51+
);
52+
let sqlalchemy_conn_str = format!("druid://{}/druid/v2/sql", router_host);
53+
let avatica_conn_str = format!(
54+
"jdbc:avatica:remote:url=http://{}/druid/v2/sql/avatica/",
55+
router_host
56+
);
57+
58+
ConfigMapBuilder::new()
59+
.metadata(
60+
ObjectMetaBuilder::new()
61+
.name_and_namespace(druid)
62+
.name(name)
63+
.ownerreference_from_resource(owner, None, Some(true))
64+
.with_context(|| ObjectMissingMetadataForOwnerRef {
65+
druid: ObjectRef::from_obj(druid),
66+
})?
67+
.with_recommended_labels(
68+
druid,
69+
APP_NAME,
70+
druid_version(druid).unwrap_or("unknown"),
71+
&DruidRole::Router.to_string(),
72+
"discovery",
73+
)
74+
.build(),
75+
)
76+
.add_data("DRUID_ROUTER", router_host)
77+
.add_data("DRUID_SQLALCHEMY", sqlalchemy_conn_str)
78+
.add_data("DRUID_AVATICA_JDBC", avatica_conn_str)
79+
.build()
80+
.context(BuildConfigMap)
81+
}

rust/operator-binary/src/druid_controller.rs

+23-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ use std::{
66
time::Duration,
77
};
88

9-
use crate::config::{get_jvm_config, get_log4j_config, get_runtime_properties};
9+
use crate::{
10+
config::{get_jvm_config, get_log4j_config, get_runtime_properties},
11+
discovery::{self, build_discovery_configmaps},
12+
};
1013
use snafu::{OptionExt, ResultExt, Snafu};
1114
use stackable_druid_crd::{
1215
DeepStorageType, DruidCluster, DruidRole, APP_NAME, CONTAINER_HTTP_PORT,
@@ -114,6 +117,12 @@ pub enum Error {
114117
PropertiesWriteError {
115118
source: stackable_operator::product_config::writer::PropertiesWriterError,
116119
},
120+
#[snafu(display("failed to build discovery ConfigMap"))]
121+
BuildDiscoveryConfig { source: discovery::Error },
122+
#[snafu(display("failed to apply discovery ConfigMap"))]
123+
ApplyDiscoveryConfig {
124+
source: stackable_operator::error::Error,
125+
},
117126
}
118127
type Result<T, E = Error> = std::result::Result<T, E>;
119128

@@ -210,6 +219,17 @@ pub async fn reconcile_druid(druid: DruidCluster, ctx: Context<Ctx>) -> Result<R
210219
}
211220
}
212221

222+
// discovery
223+
for discovery_cm in build_discovery_configmaps(&druid, &druid)
224+
.await
225+
.context(BuildDiscoveryConfig)?
226+
{
227+
client
228+
.apply_patch(FIELD_MANAGER_SCOPE, &discovery_cm, &discovery_cm)
229+
.await
230+
.context(ApplyDiscoveryConfig)?;
231+
}
232+
213233
Ok(ReconcilerAction {
214234
requeue_after: None,
215235
})
@@ -235,12 +255,12 @@ pub fn build_role_service(role_name: &str, druid: &DruidCluster) -> Result<Servi
235255
.build(),
236256
spec: Some(ServiceSpec {
237257
ports: Some(vec![ServicePort {
238-
name: Some("plaintext".to_string()),
258+
name: Some(CONTAINER_HTTP_PORT.to_string()),
239259
port: DruidRole::from_str(role_name)
240260
.unwrap()
241261
.get_http_port()
242262
.into(),
243-
target_port: Some(IntOrString::String("plaintext".to_string())),
263+
target_port: Some(IntOrString::String(CONTAINER_HTTP_PORT.to_string())),
244264
protocol: Some("TCP".to_string()),
245265
..ServicePort::default()
246266
}]),

rust/operator-binary/src/main.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod config;
2+
mod discovery;
23
mod druid_controller;
34

45
use futures::StreamExt;

0 commit comments

Comments
 (0)