Skip to content

Commit 363a029

Browse files
committed
Refresh oximeter producer list more aggressively
- Remove `oximeter` producer HTTP endpoint for registering individual producers. - Dramatically reduce interval on which `oximeter` collector refreshes its list of producers. This is now the only way the collector learns of producers. The interval is also much smaller in tests to ensure pretty snappy registrations - Remove calls from both Nexus and the `oximeter` standalone mock Nexus for registering producers - Have `oximeter` collector start polling producers immediately, rather than waiting for the first polling interval to expire. - Closes #6916, #6895, and #6901
1 parent 0f8db77 commit 363a029

File tree

8 files changed

+13
-112
lines changed

8 files changed

+13
-112
lines changed

nexus/src/app/oximeter.rs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use oximeter_client::Client as OximeterClient;
1919
use oximeter_db::query::Timestamp;
2020
use oximeter_db::Measurement;
2121
use slog::Logger;
22-
use std::convert::TryInto;
2322
use std::net::SocketAddr;
2423
use std::num::NonZeroU32;
2524
use std::time::Duration;
@@ -118,6 +117,9 @@ impl super::Nexus {
118117
}
119118

120119
/// Assign a newly-registered metric producer to an oximeter collector server.
120+
///
121+
/// Note that we don't send the registration to the collector, the collector
122+
/// polls for its list of producers periodically.
121123
pub(crate) async fn assign_producer(
122124
&self,
123125
opctx: &OpContext,
@@ -127,20 +129,6 @@ impl super::Nexus {
127129
.db_datastore
128130
.producer_endpoint_upsert_and_assign(opctx, &producer_info)
129131
.await?;
130-
131-
let address = SocketAddr::from((
132-
collector_info.ip.ip(),
133-
collector_info.port.try_into().unwrap(),
134-
));
135-
let collector =
136-
build_oximeter_client(&self.log, &collector_info.id, address);
137-
138-
collector
139-
.producers_post(&oximeter_client::types::ProducerEndpoint::from(
140-
&producer_info,
141-
))
142-
.await
143-
.map_err(Error::from)?;
144132
info!(
145133
self.log,
146134
"assigned collector to new producer";

nexus/test-utils/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1477,7 +1477,7 @@ pub async fn start_oximeter(
14771477
let config = oximeter_collector::Config {
14781478
nexus_address: Some(nexus_address),
14791479
db,
1480-
refresh_interval: oximeter_collector::default_refresh_interval(),
1480+
refresh_interval: Duration::from_secs(2),
14811481
log: ConfigLogging::StderrTerminal { level: ConfigLoggingLevel::Error },
14821482
};
14831483
let args = oximeter_collector::OximeterArguments {

openapi/oximeter.json

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -81,31 +81,6 @@
8181
"x-dropshot-pagination": {
8282
"required": []
8383
}
84-
},
85-
"post": {
86-
"summary": "Handle a request from Nexus to register a new producer with this collector.",
87-
"operationId": "producers_post",
88-
"requestBody": {
89-
"content": {
90-
"application/json": {
91-
"schema": {
92-
"$ref": "#/components/schemas/ProducerEndpoint"
93-
}
94-
}
95-
},
96-
"required": true
97-
},
98-
"responses": {
99-
"204": {
100-
"description": "resource updated"
101-
},
102-
"4XX": {
103-
"$ref": "#/components/responses/Error"
104-
},
105-
"5XX": {
106-
"$ref": "#/components/responses/Error"
107-
}
108-
}
10984
}
11085
},
11186
"/producers/{producer_id}": {

oximeter/api/src/lib.rs

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55
use chrono::{DateTime, Utc};
66
use dropshot::{
77
EmptyScanParams, HttpError, HttpResponseDeleted, HttpResponseOk,
8-
HttpResponseUpdatedNoContent, PaginationParams, Query, RequestContext,
9-
ResultsPage, TypedBody,
8+
PaginationParams, Query, RequestContext, ResultsPage,
109
};
1110
use omicron_common::api::internal::nexus::ProducerEndpoint;
1211
use schemars::JsonSchema;
@@ -17,16 +16,6 @@ use uuid::Uuid;
1716
pub trait OximeterApi {
1817
type Context;
1918

20-
/// Handle a request from Nexus to register a new producer with this collector.
21-
#[endpoint {
22-
method = POST,
23-
path = "/producers",
24-
}]
25-
async fn producers_post(
26-
request_context: RequestContext<Self::Context>,
27-
body: TypedBody<ProducerEndpoint>,
28-
) -> Result<HttpResponseUpdatedNoContent, HttpError>;
29-
3019
/// List all producers.
3120
#[endpoint {
3221
method = GET,

oximeter/collector/src/agent.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,6 @@ async fn collection_task(
155155
let mut log = orig_log.new(o!("address" => producer.address));
156156
let client = reqwest::Client::new();
157157
let mut collection_timer = interval(producer.interval);
158-
collection_timer.tick().await; // completes immediately
159158
debug!(
160159
log,
161160
"starting oximeter collection task";

oximeter/collector/src/http_entrypoints.rs

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,18 @@
44

55
//! Oximeter collector server HTTP API
66
7-
// Copyright 2023 Oxide Computer Company
7+
// Copyright 2024 Oxide Computer Company
88

99
use crate::OximeterAgent;
1010
use dropshot::ApiDescription;
1111
use dropshot::EmptyScanParams;
1212
use dropshot::HttpError;
1313
use dropshot::HttpResponseDeleted;
1414
use dropshot::HttpResponseOk;
15-
use dropshot::HttpResponseUpdatedNoContent;
1615
use dropshot::PaginationParams;
1716
use dropshot::Query;
1817
use dropshot::RequestContext;
1918
use dropshot::ResultsPage;
20-
use dropshot::TypedBody;
2119
use dropshot::WhichPage;
2220
use omicron_common::api::internal::nexus::ProducerEndpoint;
2321
use oximeter_api::*;
@@ -34,19 +32,6 @@ enum OximeterApiImpl {}
3432
impl OximeterApi for OximeterApiImpl {
3533
type Context = Arc<OximeterAgent>;
3634

37-
async fn producers_post(
38-
request_context: RequestContext<Self::Context>,
39-
body: TypedBody<ProducerEndpoint>,
40-
) -> Result<HttpResponseUpdatedNoContent, HttpError> {
41-
let agent = request_context.context();
42-
let producer_info = body.into_inner();
43-
agent
44-
.register_producer(producer_info)
45-
.await
46-
.map_err(HttpError::from)
47-
.map(|_| HttpResponseUpdatedNoContent())
48-
}
49-
5035
async fn producers_list(
5136
request_context: RequestContext<Arc<OximeterAgent>>,
5237
query: Query<PaginationParams<EmptyScanParams, ProducerPage>>,

oximeter/collector/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ impl DbConfig {
127127

128128
/// Default interval on which we refresh our list of producers from Nexus.
129129
pub const fn default_refresh_interval() -> Duration {
130-
Duration::from_secs(60 * 10)
130+
Duration::from_secs(15)
131131
}
132132

133133
/// Configuration used to initialize an oximeter server

oximeter/collector/src/standalone.rs

Lines changed: 6 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use nexus_types::internal_api::params::OximeterInfo;
2222
use omicron_common::api::internal::nexus::ProducerEndpoint;
2323
use omicron_common::api::internal::nexus::ProducerRegistrationResponse;
2424
use omicron_common::FileKv;
25-
use oximeter_client::Client;
2625
use rand::seq::IteratorRandom;
2726
use slog::debug;
2827
use slog::error;
@@ -104,21 +103,14 @@ impl StandaloneNexus {
104103
//
105104
// Select a random collector, and assign it to the producer.
106105
// We'll return the assignment from this match block.
107-
let Some((collector_id, collector_info)) =
106+
let Some((collector_id, _collector_info)) =
108107
inner.random_collector()
109108
else {
110109
return Err(HttpError::for_unavail(
111110
None,
112111
String::from("No collectors available"),
113112
));
114113
};
115-
let client = Client::new(
116-
format!("http://{}", collector_info.address).as_str(),
117-
self.log.clone(),
118-
);
119-
client.producers_post(&info.into()).await.map_err(|e| {
120-
HttpError::for_internal_error(e.to_string())
121-
})?;
122114
let assignment =
123115
ProducerAssignment { producer: info.clone(), collector_id };
124116
assignment
@@ -131,18 +123,9 @@ impl StandaloneNexus {
131123
}
132124

133125
// This appears to be a re-registration, e.g., the producer
134-
// changed its IP address. Re-register it with the collector to
135-
// which it's already assigned.
126+
// changed its IP address. The collector will learn of this when
127+
// it next fetches its list.
136128
let collector_id = existing_assignment.collector_id;
137-
let collector_info =
138-
inner.collectors.get(&collector_id).unwrap();
139-
let client = Client::new(
140-
format!("http://{}", collector_info.address).as_str(),
141-
self.log.clone(),
142-
);
143-
client.producers_post(&info.into()).await.map_err(|e| {
144-
HttpError::for_internal_error(e.to_string())
145-
})?;
146129
ProducerAssignment { producer: info.clone(), collector_id }
147130
}
148131
};
@@ -154,27 +137,9 @@ impl StandaloneNexus {
154137
&self,
155138
info: OximeterInfo,
156139
) -> Result<(), HttpError> {
157-
// If this is being registered again, send all its assignments again.
158-
let mut inner = self.inner.lock().await;
159-
if inner.collectors.insert(info.collector_id, info).is_some() {
160-
let client = Client::new(
161-
format!("http://{}", info.address).as_str(),
162-
self.log.clone(),
163-
);
164-
for producer_info in
165-
inner.producers.values().filter_map(|assignment| {
166-
if assignment.collector_id == info.collector_id {
167-
Some(&assignment.producer)
168-
} else {
169-
None
170-
}
171-
})
172-
{
173-
client.producers_post(&producer_info.into()).await.map_err(
174-
|e| HttpError::for_internal_error(e.to_string()),
175-
)?;
176-
}
177-
}
140+
// No-op if this is being re-registered. It will fetch its list of
141+
// producers again if needed.
142+
self.inner.lock().await.collectors.insert(info.collector_id, info);
178143
Ok(())
179144
}
180145
}

0 commit comments

Comments
 (0)