Skip to content

Commit ba6b86c

Browse files
committed
feat(subscriber) expose server parts
The `ConsoleLayer` builder provides the user with a console layer and a server, which is used to start the gRPC server. However, it may be desireable to expose the instrumentation server together with other services on the same Tonic router. This was requested explicitly in #428. Additionally, to add tests which make use of the instrumentation server (as part of improving test coverage for #450), more flexibility is needed than what is provided by the current API. Specifically we would like to connect a client and server via an in memory channel, rather than a TCP connection. This change adds an additional method to `console_subscriber::Server` called `into_parts` which allows the user to access the `InstrumentServer` directly. A handle which controls the lifetime of the `Aggregator` is also provided, as the user must ensure that the aggregator lives at least as long as the instrument server. To facilitate the addition of functionality which would result in more "parts" in the future, `into_parts` returns a non-exhaustive struct, rather than a tuple of parts. Closes: #428
1 parent 7c8e80a commit ba6b86c

File tree

1 file changed

+107
-12
lines changed

1 file changed

+107
-12
lines changed

console-subscriber/src/lib.rs

+107-12
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#![doc = include_str!("../README.md")]
22
use console_api as proto;
3-
use proto::resources::resource;
3+
use proto::{instrument::instrument_server::InstrumentServer, resources::resource};
44
use serde::Serialize;
55
use std::{
66
cell::RefCell,
@@ -15,7 +15,10 @@ use std::{
1515
use thread_local::ThreadLocal;
1616
#[cfg(unix)]
1717
use tokio::net::UnixListener;
18-
use tokio::sync::{mpsc, oneshot};
18+
use tokio::{
19+
sync::{mpsc, oneshot},
20+
task::JoinHandle,
21+
};
1922
#[cfg(unix)]
2023
use tokio_stream::wrappers::UnixListenerStream;
2124
use tracing_core::{
@@ -933,18 +936,15 @@ impl Server {
933936
///
934937
/// [`tonic`]: https://docs.rs/tonic/
935938
pub async fn serve_with(
936-
mut self,
939+
self,
937940
mut builder: tonic::transport::Server,
938941
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
939-
let aggregate = self
940-
.aggregator
941-
.take()
942-
.expect("cannot start server multiple times");
943-
let aggregate = spawn_named(aggregate.run(), "console::aggregate");
944942
let addr = self.addr.clone();
945-
let router = builder.add_service(
946-
proto::instrument::instrument_server::InstrumentServer::new(self),
947-
);
943+
let ServerParts {
944+
instrument_server: service,
945+
aggregator_handle: aggregate,
946+
} = self.into_parts();
947+
let router = builder.add_service(service);
948948
let res = match addr {
949949
ServerAddr::Tcp(addr) => {
950950
let serve = router.serve(addr);
@@ -957,9 +957,104 @@ impl Server {
957957
spawn_named(serve, "console::serve").await
958958
}
959959
};
960-
aggregate.abort();
960+
drop(aggregate);
961961
res?.map_err(Into::into)
962962
}
963+
964+
/// Returns the parts needed to spawn a gRPC server and keep the aggregation
965+
/// worker running.
966+
///
967+
/// # Examples
968+
///
969+
/// The parts can be used to serve the instrument server together with
970+
/// other endpoints from the same gRPC server.
971+
///
972+
/// ```
973+
/// use console_subscriber::{ConsoleLayer, ServerParts};
974+
///
975+
/// # let runtime = tokio::runtime::Builder::new_current_thread()
976+
/// # .enable_all()
977+
/// # .build()
978+
/// # .unwrap();
979+
/// # runtime.block_on(async {
980+
/// let (console_layer, server) = ConsoleLayer::builder().build();
981+
/// let ServerParts {
982+
/// instrument_server,
983+
/// aggregator_handle,
984+
/// ..
985+
/// } = server.into_parts();
986+
///
987+
/// let router = tonic::transport::Server::builder()
988+
/// //.add_service(some_other_service)
989+
/// .add_service(instrument_server);
990+
/// let serve = router.serve(std::net::SocketAddr::new(
991+
/// std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
992+
/// 6669,
993+
/// ));
994+
///
995+
/// // Finally, spawn the server.
996+
/// tokio::spawn(serve);
997+
/// # // Avoid a warning that `console_layer` and `aggregator_handle` are unused.
998+
/// # drop(console_layer);
999+
/// # drop(aggregator_handle);
1000+
/// # });
1001+
/// ```
1002+
pub fn into_parts(mut self) -> ServerParts {
1003+
let aggregate = self
1004+
.aggregator
1005+
.take()
1006+
.expect("cannot start server multiple times");
1007+
let aggregate = spawn_named(aggregate.run(), "console::aggregate");
1008+
1009+
let service = proto::instrument::instrument_server::InstrumentServer::new(self);
1010+
1011+
ServerParts {
1012+
instrument_server: service,
1013+
aggregator_handle: AggregatorHandle {
1014+
join_handle: aggregate,
1015+
},
1016+
}
1017+
}
1018+
}
1019+
1020+
/// Server Parts
1021+
///
1022+
/// This struct contains the parts returned by [`Server::into_parts`]. It may contain
1023+
/// further parts in the future, an as such is marked as [`non_exhaustive`].
1024+
///
1025+
/// The `InstrumentServer<Server>` can be used to construct a router which
1026+
/// can be added to a [`tonic`] gRPC server.
1027+
///
1028+
/// The [`AggregatorHandle`] must be kept until after the server has been
1029+
/// shut down.
1030+
///
1031+
/// See the [`Server::into_parts`] documentation for usage.
1032+
#[non_exhaustive]
1033+
pub struct ServerParts {
1034+
/// The instrument server.
1035+
///
1036+
/// See the documentation for [`InstrumentServer`] for details.
1037+
pub instrument_server: InstrumentServer<Server>,
1038+
1039+
/// The aggregate handle.
1040+
///
1041+
/// See the documentation for [`AggregatorHandle`] for details.
1042+
pub aggregator_handle: AggregatorHandle,
1043+
}
1044+
1045+
/// Aggregator handle.
1046+
///
1047+
/// This object is returned from [`Server::into_parts`] and must be
1048+
/// kept as long as the `InstrumentServer<Server>` - which is also
1049+
/// returned - is in use.
1050+
pub struct AggregatorHandle {
1051+
join_handle: JoinHandle<()>,
1052+
}
1053+
1054+
impl Drop for AggregatorHandle {
1055+
fn drop(&mut self) {
1056+
self.join_handle.abort();
1057+
}
9631058
}
9641059

9651060
#[tonic::async_trait]

0 commit comments

Comments
 (0)