Skip to content

Commit e5564f2

Browse files
committed
basic cluster transport protocol working
1 parent efd6851 commit e5564f2

File tree

10 files changed

+375
-40
lines changed

10 files changed

+375
-40
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/clusterd/src/lib.rs

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use hyper_util::rt::TokioIo;
2020
use mz_build_info::{BuildInfo, build_info};
2121
use mz_cloud_resources::AwsExternalIdPrefix;
2222
use mz_compute::server::ComputeInstanceContext;
23-
use mz_compute_client::service::proto_compute_server::ProtoComputeServer;
2423
use mz_http_util::DynamicFilterTarget;
2524
use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
2625
use mz_ore::cli::{self, CliConfig};
@@ -32,10 +31,9 @@ use mz_persist_client::cache::PersistClientCache;
3231
use mz_persist_client::cfg::PersistConfig;
3332
use mz_persist_client::rpc::{GrpcPubSubClient, PersistPubSubClient, PersistPubSubClientConfig};
3433
use mz_service::emit_boot_diagnostics;
35-
use mz_service::grpc::{GrpcServer, GrpcServerMetrics, MAX_GRPC_MESSAGE_SIZE};
3634
use mz_service::secrets::SecretsReaderCliArgs;
35+
use mz_service::transport;
3736
use mz_storage::storage_state::StorageInstanceContext;
38-
use mz_storage_client::client::proto_storage_server::ProtoStorageServer;
3937
use mz_storage_types::connections::ConnectionContext;
4038
use mz_txn_wal::operator::TxnsContext;
4139
use tokio::runtime::Handle;
@@ -316,9 +314,6 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
316314
None,
317315
);
318316

319-
let grpc_host = args.grpc_host.and_then(|h| (!h.is_empty()).then_some(h));
320-
let grpc_server_metrics = GrpcServerMetrics::register_with(&metrics_registry);
321-
322317
// Start storage server.
323318
let storage_client_builder = mz_storage::serve(
324319
&metrics_registry,
@@ -335,13 +330,10 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
335330
);
336331
mz_ore::task::spawn(
337332
|| "storage_server",
338-
GrpcServer::serve(
339-
&grpc_server_metrics,
333+
transport::serve(
340334
args.storage_controller_listen_addr,
341335
BUILD_INFO.semver_version(),
342-
grpc_host.clone(),
343336
storage_client_builder,
344-
|svc| ProtoStorageServer::new(svc).max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
345337
),
346338
);
347339

@@ -363,13 +355,10 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
363355
);
364356
mz_ore::task::spawn(
365357
|| "compute_server",
366-
GrpcServer::serve(
367-
&grpc_server_metrics,
358+
transport::serve(
368359
args.compute_controller_listen_addr,
369360
BUILD_INFO.semver_version(),
370-
grpc_host,
371361
compute_client_builder,
372-
|svc| ProtoComputeServer::new(svc).max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
373362
),
374363
);
375364

src/compute-client/src/controller/replica.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@
99

1010
//! A client for replicas of a compute instance.
1111
12-
use std::sync::Arc;
12+
use std::str::FromStr;
1313
use std::sync::atomic::{self, AtomicBool};
14+
use std::sync::Arc;
1415
use std::time::{Duration, Instant};
1516

1617
use anyhow::bail;
@@ -19,13 +20,15 @@ use mz_cluster_client::client::{ClusterReplicaLocation, ClusterStartupEpoch, Tim
1920
use mz_compute_types::dyncfgs::ENABLE_COMPUTE_REPLICA_EXPIRATION;
2021
use mz_dyncfg::ConfigSet;
2122
use mz_ore::channel::InstrumentedUnboundedSender;
23+
use mz_ore::netio::SocketAddr;
2224
use mz_ore::retry::{Retry, RetryState};
2325
use mz_ore::task::AbortOnDropHandle;
2426
use mz_service::client::GenericClient;
2527
use mz_service::params::GrpcClientParameters;
28+
use mz_service::transport;
2629
use tokio::select;
2730
use tokio::sync::mpsc::error::SendError;
28-
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
31+
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
2932
use tracing::{debug, info, trace, warn};
3033

3134
use crate::controller::instance::ReplicaResponse;
@@ -36,7 +39,6 @@ use crate::metrics::IntCounter;
3639
use crate::metrics::ReplicaMetrics;
3740
use crate::protocol::command::{ComputeCommand, InitialComputeParameters, InstanceConfig};
3841
use crate::protocol::response::ComputeResponse;
39-
use crate::service::{ComputeClient, ComputeGrpcClient};
4042

4143
type Client<T> = SequentialHydration<T>;
4244

@@ -69,7 +71,6 @@ pub(super) struct ReplicaClient<T> {
6971
impl<T> ReplicaClient<T>
7072
where
7173
T: ComputeControllerTimestamp,
72-
ComputeGrpcClient: ComputeClient<T>,
7374
{
7475
pub(super) fn spawn(
7576
id: ReplicaId,
@@ -160,7 +161,6 @@ struct ReplicaTask<T> {
160161
impl<T> ReplicaTask<T>
161162
where
162163
T: ComputeControllerTimestamp,
163-
ComputeGrpcClient: ComputeClient<T>,
164164
{
165165
/// Asynchronously forwards commands to and responses from a single replica.
166166
async fn run(self) {
@@ -181,17 +181,15 @@ where
181181
async fn connect(&self) -> Client<T> {
182182
let try_connect = |retry: RetryState| {
183183
let addrs = &self.config.location.ctl_addrs;
184-
let dests = addrs
184+
let addrs = addrs
185185
.iter()
186-
.map(|addr| (addr.clone(), self.metrics.clone()))
186+
.map(|addr| SocketAddr::from_str(addr).unwrap())
187187
.collect();
188188
let version = self.build_info.semver_version();
189-
let client_params = &self.config.grpc_client;
190189

191190
async move {
192191
let connect_start = Instant::now();
193-
let connect_result =
194-
ComputeGrpcClient::connect_partitioned(dests, version, client_params).await;
192+
let connect_result = transport::Client::connect_partitioned(addrs, version).await;
195193
self.metrics.observe_connect_time(connect_start.elapsed());
196194

197195
connect_result.inspect_err(|error| {
@@ -233,7 +231,6 @@ where
233231
async fn run_message_loop(mut self, mut client: Client<T>) -> Result<(), anyhow::Error>
234232
where
235233
T: ComputeControllerTimestamp,
236-
ComputeGrpcClient: ComputeClient<T>,
237234
{
238235
let id = self.replica_id;
239236
let incarnation = self.epoch.replica();

src/compute-client/src/protocol/response.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ include!(concat!(
3636
/// from the compute controller.
3737
///
3838
/// [`ComputeCommand`]: super::command::ComputeCommand
39-
#[derive(Clone, Debug, PartialEq)]
39+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
4040
pub enum ComputeResponse<T = mz_repr::Timestamp> {
4141
/// `Frontiers` announces the advancement of the various frontiers of the specified compute
4242
/// collection.
@@ -320,7 +320,7 @@ impl Arbitrary for FrontiersResponse {
320320
///
321321
/// Note that each `Peek` expects to generate exactly one `PeekResponse`, i.e.
322322
/// we expect a 1:1 contract between `Peek` and `PeekResponse`.
323-
#[derive(Clone, Debug, PartialEq)]
323+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
324324
pub enum PeekResponse {
325325
/// Returned rows of a successful peek.
326326
Rows(RowCollection),

src/expr/src/row/collection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ include!(concat!(env!("OUT_DIR"), "/mz_expr.row.collection.rs"));
2929
///
3030
/// Note: the encoding format we use to represent [`Row`]s in this struct is
3131
/// not stable, and thus should never be persisted durably.
32-
#[derive(Default, Debug, Clone, PartialEq)]
32+
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
3333
pub struct RowCollection {
3434
/// Contiguous blob of encoded Rows.
3535
encoded: Bytes,

src/ore/src/netio/socket.rs

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::{fmt, io};
2323
use async_trait::async_trait;
2424
use tokio::fs;
2525
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
26-
use tokio::net::{self, TcpListener, TcpStream, UnixListener, UnixStream};
26+
use tokio::net::{self, tcp, unix, TcpListener, TcpStream, UnixListener, UnixStream};
2727
use tonic::transport::server::{Connected, TcpConnectInfo, UdsConnectInfo};
2828
use tracing::warn;
2929

@@ -440,6 +440,21 @@ impl Stream {
440440
Stream::Unix(stream) => stream,
441441
}
442442
}
443+
444+
/// Splits a stream into a read half and a write half, which can be used to read and write the
445+
/// stream concurrently.
446+
pub fn split(self) -> (StreamReadHalf, StreamWriteHalf) {
447+
match self {
448+
Stream::Tcp(stream) => {
449+
let (rx, tx) = stream.into_split();
450+
(StreamReadHalf::Tcp(rx), StreamWriteHalf::Tcp(tx))
451+
}
452+
Stream::Unix(stream) => {
453+
let (rx, tx) = stream.into_split();
454+
(StreamReadHalf::Unix(rx), StreamWriteHalf::Unix(tx))
455+
}
456+
}
457+
}
443458
}
444459

445460
impl AsyncRead for Stream {
@@ -489,6 +504,56 @@ impl Connected for Stream {
489504
}
490505
}
491506

507+
/// Read half of a [`Stream`], created by [`Stream::split`].
508+
#[derive(Debug)]
509+
pub enum StreamReadHalf {
510+
Tcp(tcp::OwnedReadHalf),
511+
Unix(unix::OwnedReadHalf),
512+
}
513+
514+
impl AsyncRead for StreamReadHalf {
515+
fn poll_read(
516+
self: Pin<&mut Self>,
517+
cx: &mut Context,
518+
buf: &mut ReadBuf,
519+
) -> Poll<io::Result<()>> {
520+
match self.get_mut() {
521+
Self::Tcp(rx) => Pin::new(rx).poll_read(cx, buf),
522+
Self::Unix(rx) => Pin::new(rx).poll_read(cx, buf),
523+
}
524+
}
525+
}
526+
527+
/// Write half of a [`Stream`], created by [`Stream::split`].
528+
#[derive(Debug)]
529+
pub enum StreamWriteHalf {
530+
Tcp(tcp::OwnedWriteHalf),
531+
Unix(unix::OwnedWriteHalf),
532+
}
533+
534+
impl AsyncWrite for StreamWriteHalf {
535+
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
536+
match self.get_mut() {
537+
Self::Tcp(tx) => Pin::new(tx).poll_write(cx, buf),
538+
Self::Unix(tx) => Pin::new(tx).poll_write(cx, buf),
539+
}
540+
}
541+
542+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
543+
match self.get_mut() {
544+
Self::Tcp(tx) => Pin::new(tx).poll_flush(cx),
545+
Self::Unix(tx) => Pin::new(tx).poll_flush(cx),
546+
}
547+
}
548+
549+
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
550+
match self.get_mut() {
551+
Self::Tcp(tx) => Pin::new(tx).poll_shutdown(cx),
552+
Self::Unix(tx) => Pin::new(tx).poll_shutdown(cx),
553+
}
554+
}
555+
}
556+
492557
/// Connection information for a [`Stream`].
493558
#[derive(Debug, Clone)]
494559
pub enum ConnectInfo {

src/service/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ workspace = true
1313
anyhow = "1.0.95"
1414
async-stream = "0.3.3"
1515
async-trait = "0.1.83"
16+
bincode = "1.3.3"
1617
clap = { version = "4.5.23", features = ["env", "derive"] }
1718
crossbeam-channel = "0.5.15"
19+
flate2 = "1.1.0"
1820
futures = "0.3.31"
1921
http = "1.2.0"
2022
hyper-util = "0.1.6"

src/service/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,4 @@ pub mod params;
2222
pub mod retry;
2323
pub mod secrets;
2424
pub mod tracing;
25+
pub mod transport;

0 commit comments

Comments
 (0)