Skip to content

Commit 9564171

Browse files
authored
Schemas and Databases search through the query engine (#839)
* Done * Schemas ready * Extended databases view and volumes view * Databases ready * cargo clippy + fmt * cargo clippy + fmt * test fix * Requested changes * Requested changes
1 parent b48f790 commit 9564171

File tree

16 files changed

+349
-188
lines changed

16 files changed

+349
-188
lines changed

Cargo.lock

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/api-ui/src/databases/error.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::error::ErrorResponse;
22
use crate::error::IntoStatusCode;
33
use axum::Json;
44
use axum::response::IntoResponse;
5+
use core_executor::error::ExecutionError;
56
use core_metastore::error::MetastoreError;
67
use http::StatusCode;
78
use snafu::prelude::*;
@@ -20,7 +21,7 @@ pub enum DatabasesAPIError {
2021
#[snafu(display("Update database error: {source}"))]
2122
Update { source: MetastoreError },
2223
#[snafu(display("Get databases error: {source}"))]
23-
List { source: MetastoreError },
24+
List { source: ExecutionError },
2425
}
2526

2627
// Select which status code to return.

crates/api-ui/src/databases/handlers.rs

Lines changed: 61 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,24 @@
1-
use crate::databases::models::DatabasesParameters;
1+
use crate::OrderDirection;
22
use crate::state::AppState;
33
use crate::{
4+
SearchParameters,
45
databases::error::{DatabasesAPIError, DatabasesResult},
56
databases::models::{
67
Database, DatabaseCreatePayload, DatabaseCreateResponse, DatabaseResponse,
78
DatabaseUpdatePayload, DatabaseUpdateResponse, DatabasesResponse,
89
},
10+
downcast_string_column,
911
error::ErrorResponse,
1012
};
13+
use api_sessions::DFSessionId;
1114
use axum::{
1215
Json,
1316
extract::{Path, Query, State},
1417
};
18+
use core_executor::models::QueryResultData;
19+
use core_executor::query::QueryContext;
1520
use core_metastore::Database as MetastoreDatabase;
1621
use core_metastore::error::MetastoreError;
17-
use core_utils::scan_iterator::ScanIterator;
1822
use utoipa::OpenApi;
1923
use validator::Validate;
2024

@@ -81,11 +85,7 @@ pub async fn create_database(
8185
.create_database(&database.ident.clone(), database)
8286
.await
8387
.map_err(|e| DatabasesAPIError::Create { source: e })
84-
.map(|o| {
85-
Json(DatabaseCreateResponse {
86-
data: o.data.into(),
87-
})
88-
})
88+
.map(|o| Json(DatabaseCreateResponse { data: o.into() }))
8989
}
9090

9191
#[utoipa::path(
@@ -113,9 +113,7 @@ pub async fn get_database(
113113
Path(database_name): Path<String>,
114114
) -> DatabasesResult<Json<DatabaseResponse>> {
115115
match state.metastore.get_database(&database_name).await {
116-
Ok(Some(db)) => Ok(Json(DatabaseResponse {
117-
data: db.data.into(),
118-
})),
116+
Ok(Some(db)) => Ok(Json(DatabaseResponse { data: db.into() })),
119117
Ok(None) => Err(DatabasesAPIError::Get {
120118
source: MetastoreError::DatabaseNotFound {
121119
db: database_name.clone(),
@@ -194,20 +192,18 @@ pub async fn update_database(
194192
.update_database(&database_name, database)
195193
.await
196194
.map_err(|e| DatabasesAPIError::Update { source: e })
197-
.map(|o| {
198-
Json(DatabaseUpdateResponse {
199-
data: o.data.into(),
200-
})
201-
})
195+
.map(|o| Json(DatabaseUpdateResponse { data: o.into() }))
202196
}
203197

204198
#[utoipa::path(
205199
get,
206200
operation_id = "getDatabases",
207201
params(
208-
("cursor" = Option<String>, Query, description = "Databases cursor"),
202+
("offset" = Option<usize>, Query, description = "Databases offset"),
209203
("limit" = Option<usize>, Query, description = "Databases limit"),
210-
("search" = Option<String>, Query, description = "Databases search (start with)"),
204+
("search" = Option<String>, Query, description = "Databases search"),
205+
("order_by" = Option<String>, Query, description = "Order by: database_name (default), volume_name, created_at, updated_at"),
206+
("order_direction" = Option<OrderDirection>, Query, description = "Order direction: ASC, DESC (default)"),
211207
),
212208
tags = ["databases"],
213209
path = "/ui/databases",
@@ -223,30 +219,56 @@ pub async fn update_database(
223219
)
224220
)]
225221
#[tracing::instrument(level = "debug", skip(state), err, ret(level = tracing::Level::TRACE))]
222+
#[allow(clippy::unwrap_used)]
226223
pub async fn list_databases(
227-
Query(parameters): Query<DatabasesParameters>,
224+
DFSessionId(session_id): DFSessionId,
225+
Query(parameters): Query<SearchParameters>,
228226
State(state): State<AppState>,
229227
) -> DatabasesResult<Json<DatabasesResponse>> {
230-
state
231-
.metastore
232-
.iter_databases()
233-
.cursor(parameters.cursor.clone())
234-
.limit(parameters.limit)
235-
.token(parameters.search)
236-
.collect()
228+
let context = QueryContext::default();
229+
let sql_string = "SELECT * FROM slatedb.public.databases".to_string();
230+
let sql_string = parameters.search.map_or_else(|| sql_string.clone(), |search|
231+
format!("{sql_string} WHERE (database_name ILIKE '%{search}%' OR volume_name ILIKE '%{search}%')")
232+
);
233+
let sql_string = parameters.order_by.map_or_else(
234+
|| format!("{sql_string} ORDER BY database_name"),
235+
|order_by| format!("{sql_string} ORDER BY {order_by}"),
236+
);
237+
let sql_string = parameters.order_direction.map_or_else(
238+
|| format!("{sql_string} DESC"),
239+
|order_direction| format!("{sql_string} {order_direction}"),
240+
);
241+
let sql_string = parameters.offset.map_or_else(
242+
|| sql_string.clone(),
243+
|offset| format!("{sql_string} OFFSET {offset}"),
244+
);
245+
let sql_string = parameters.limit.map_or_else(
246+
|| sql_string.clone(),
247+
|limit| format!("{sql_string} LIMIT {limit}"),
248+
);
249+
let QueryResultData { records, .. } = state
250+
.execution_svc
251+
.query(&session_id, sql_string.as_str(), context)
237252
.await
238-
.map_err(|e| DatabasesAPIError::List {
239-
source: MetastoreError::UtilSlateDB { source: e },
240-
})
241-
.map(|o| {
242-
let next_cursor = o
243-
.iter()
244-
.last()
245-
.map_or(String::new(), |rw_object| rw_object.ident.clone());
246-
Json(DatabasesResponse {
247-
items: o.into_iter().map(|x| x.data.into()).collect(),
248-
current_cursor: parameters.cursor,
249-
next_cursor,
250-
})
251-
})
253+
.map_err(|e| DatabasesAPIError::List { source: e })?;
254+
let mut items = Vec::new();
255+
for record in records {
256+
let database_names = downcast_string_column(&record, "database_name")
257+
.map_err(|e| DatabasesAPIError::List { source: e })?;
258+
let volume_names = downcast_string_column(&record, "volume_name")
259+
.map_err(|e| DatabasesAPIError::List { source: e })?;
260+
let created_at_timestamps = downcast_string_column(&record, "created_at")
261+
.map_err(|e| DatabasesAPIError::List { source: e })?;
262+
let updated_at_timestamps = downcast_string_column(&record, "updated_at")
263+
.map_err(|e| DatabasesAPIError::List { source: e })?;
264+
for i in 0..record.num_rows() {
265+
items.push(Database {
266+
name: database_names.value(i).to_string(),
267+
volume: volume_names.value(i).to_string(),
268+
created_at: created_at_timestamps.value(i).to_string(),
269+
updated_at: updated_at_timestamps.value(i).to_string(),
270+
});
271+
}
272+
}
273+
Ok(Json(DatabasesResponse { items }))
252274
}

crates/api-ui/src/databases/models.rs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,35 @@
1-
use crate::default_limit;
1+
use core_metastore::RwObject;
22
use core_metastore::models::Database as MetastoreDatabase;
33
use serde::{Deserialize, Serialize};
4-
use utoipa::{IntoParams, ToSchema};
4+
use utoipa::ToSchema;
55

66
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, Eq, PartialEq)]
77
pub struct Database {
88
pub name: String,
99
pub volume: String,
10+
pub created_at: String,
11+
pub updated_at: String,
1012
}
1113

1214
impl From<MetastoreDatabase> for Database {
1315
fn from(db: MetastoreDatabase) -> Self {
1416
Self {
1517
name: db.ident,
1618
volume: db.volume,
19+
//TODO: fix this, we must use a different payload or change the test suite for dbs
20+
created_at: "ERROR".to_string(),
21+
updated_at: "ERROR".to_string(),
22+
}
23+
}
24+
}
25+
26+
impl From<RwObject<MetastoreDatabase>> for Database {
27+
fn from(db: RwObject<MetastoreDatabase>) -> Self {
28+
Self {
29+
name: db.data.ident,
30+
volume: db.data.volume,
31+
created_at: db.created_at.to_string(),
32+
updated_at: db.updated_at.to_string(),
1733
}
1834
}
1935
}
@@ -70,14 +86,4 @@ pub struct DatabaseResponse {
7086
#[serde(rename_all = "camelCase")]
7187
pub struct DatabasesResponse {
7288
pub items: Vec<Database>,
73-
pub current_cursor: Option<String>,
74-
pub next_cursor: String,
75-
}
76-
77-
#[derive(Debug, Deserialize, ToSchema, IntoParams)]
78-
pub struct DatabasesParameters {
79-
pub cursor: Option<String>,
80-
#[serde(default = "default_limit")]
81-
pub limit: Option<u16>,
82-
pub search: Option<String>,
8389
}

crates/api-ui/src/lib.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
use core_executor::error::ExecutionError;
2+
use datafusion::arrow::array::{RecordBatch, StringArray};
3+
use datafusion::common::DataFusionError;
4+
use serde::Deserialize;
5+
use std::fmt::Display;
6+
use utoipa::{IntoParams, ToSchema};
7+
18
pub mod auth;
29
pub mod config;
310
pub mod dashboard;
@@ -21,3 +28,48 @@ pub mod worksheets;
2128
const fn default_limit() -> Option<u16> {
2229
Some(250)
2330
}
31+
32+
#[derive(Debug, Deserialize, ToSchema, IntoParams)]
33+
#[serde(rename_all = "camelCase")]
34+
pub struct SearchParameters {
35+
pub offset: Option<usize>,
36+
#[serde(default = "default_limit")]
37+
pub limit: Option<u16>,
38+
pub search: Option<String>,
39+
pub order_by: Option<String>,
40+
pub order_direction: Option<OrderDirection>,
41+
}
42+
43+
#[derive(Debug, Deserialize, ToSchema)]
44+
#[serde(rename_all = "UPPERCASE")]
45+
pub enum OrderDirection {
46+
ASC,
47+
DESC,
48+
}
49+
50+
impl Default for OrderDirection {
51+
fn default() -> Self {
52+
Self::DESC
53+
}
54+
}
55+
56+
impl Display for OrderDirection {
57+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58+
match self {
59+
Self::ASC => write!(f, "ASC"),
60+
Self::DESC => write!(f, "DESC"),
61+
}
62+
}
63+
}
64+
65+
fn downcast_string_column<'a>(
66+
batch: &'a RecordBatch,
67+
name: &str,
68+
) -> Result<&'a StringArray, ExecutionError> {
69+
batch
70+
.column_by_name(name)
71+
.and_then(|col| col.as_any().downcast_ref::<StringArray>())
72+
.ok_or_else(|| ExecutionError::DataFusion {
73+
source: DataFusionError::Internal(format!("Missing or invalid column: '{name}'")),
74+
})
75+
}

crates/api-ui/src/navigation_trees/handlers.rs

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::downcast_string_column;
12
use crate::error::ErrorResponse;
23
use crate::navigation_trees::error::{NavigationTreesAPIError, NavigationTreesResult};
34
use crate::navigation_trees::models::{
@@ -8,9 +9,7 @@ use crate::state::AppState;
89
use api_sessions::DFSessionId;
910
use axum::extract::Query;
1011
use axum::{Json, extract::State};
11-
use core_executor::{error::ExecutionError, models::QueryResultData, query::QueryContext};
12-
use datafusion::arrow::array::{RecordBatch, StringArray};
13-
use datafusion::common::DataFusionError;
12+
use core_executor::{models::QueryResultData, query::QueryContext};
1413
use std::collections::BTreeMap;
1514
use utoipa::OpenApi;
1615

@@ -76,11 +75,14 @@ pub async fn get_navigation_trees(
7675
BTreeMap::new();
7776

7877
for batch in tree_batches {
79-
let databases = downcast_string_column(&batch, "database")?;
80-
let schemas = downcast_string_column(&batch, "schema")?;
81-
let tables = downcast_string_column(&batch, "table")?;
82-
let table_types = downcast_string_column(&batch, "table_type")?;
83-
78+
let databases = downcast_string_column(&batch, "database")
79+
.map_err(|e| NavigationTreesAPIError::Execution { source: e })?;
80+
let schemas = downcast_string_column(&batch, "schema")
81+
.map_err(|e| NavigationTreesAPIError::Execution { source: e })?;
82+
let tables = downcast_string_column(&batch, "table")
83+
.map_err(|e| NavigationTreesAPIError::Execution { source: e })?;
84+
let table_types = downcast_string_column(&batch, "table_type")
85+
.map_err(|e| NavigationTreesAPIError::Execution { source: e })?;
8486
for j in 0..batch.num_rows() {
8587
let database = databases.value(j).to_string();
8688
let schema = schemas.value(j).to_string();
@@ -133,17 +135,3 @@ pub async fn get_navigation_trees(
133135

134136
Ok(Json(NavigationTreesResponse { items }))
135137
}
136-
137-
fn downcast_string_column<'a>(
138-
batch: &'a RecordBatch,
139-
name: &str,
140-
) -> Result<&'a StringArray, NavigationTreesAPIError> {
141-
batch
142-
.column_by_name(name)
143-
.and_then(|col| col.as_any().downcast_ref::<StringArray>())
144-
.ok_or_else(|| NavigationTreesAPIError::Execution {
145-
source: ExecutionError::DataFusion {
146-
source: DataFusionError::Internal(format!("Missing or invalid column: '{name}'")),
147-
},
148-
})
149-
}

crates/api-ui/src/schemas/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ pub enum SchemasAPIError {
2121
#[snafu(display("Update schema error: {source}"))]
2222
Update { source: MetastoreError },
2323
#[snafu(display("Get schemas error: {source}"))]
24-
List { source: MetastoreError },
24+
List { source: ExecutionError },
2525
}
2626

2727
// Select which status code to return.

0 commit comments

Comments
 (0)