Skip to content

Commit 5970a4e

Browse files
committed
moving to simpler processor
1 parent d9c548c commit 5970a4e

16 files changed

+2026
-458
lines changed

Diff for: core/anything-server/src/main.rs

-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ extern crate slugify;
3333

3434
use auth::init::AuthState;
3535

36-
3736
mod system_plugins;
3837
mod system_workflows;
3938
mod processor;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,344 @@
1+
use chrono::Utc;
2+
use dotenv::dotenv;
3+
use serde_json::Value;
4+
use std::collections::HashSet;
5+
use std::{env, sync::Arc};
6+
use tracing::debug;
7+
use uuid::Uuid;
8+
9+
use crate::system_plugins::http::http_plugin::parse_headers;
10+
use crate::types::{
11+
task_types::{FlowSessionStatus, Task, TaskStatus, TriggerSessionStatus},
12+
workflow_types::DatabaseFlowVersion,
13+
};
14+
use crate::AppState;
15+
use chrono::DateTime;
16+
use serde::{Deserialize, Serialize};
17+
18+
#[derive(Debug, Deserialize, Serialize)]
19+
pub struct UpdateFlowSesssionInput {
20+
pub flow_session_status: String,
21+
pub trigger_session_status: String,
22+
}
23+
24+
#[derive(Debug, Deserialize, Serialize)]
25+
pub struct UpdateTaskInput {
26+
pub task_status: String,
27+
#[serde(skip_serializing_if = "Option::is_none")]
28+
pub started_at: Option<DateTime<Utc>>,
29+
#[serde(skip_serializing_if = "Option::is_none")]
30+
pub ended_at: Option<DateTime<Utc>>,
31+
#[serde(skip_serializing_if = "Option::is_none")]
32+
pub result: Option<Value>,
33+
#[serde(skip_serializing_if = "Option::is_none")]
34+
pub context: Option<Value>,
35+
#[serde(skip_serializing_if = "Option::is_none")]
36+
pub error: Option<Value>,
37+
}
38+
39+
pub async fn get_workflow_definition(
40+
state: Arc<AppState>,
41+
workflow_id: &Uuid,
42+
version_id: Option<&Uuid>, // Make version_id optional since webhooks don't have it
43+
) -> Result<DatabaseFlowVersion, String> {
44+
println!(
45+
"[PROCESSOR DB CALLS] Getting workflow definition for workflow_id: {}, version_id: {:?}",
46+
workflow_id, version_id
47+
);
48+
//Super User Access
49+
dotenv().ok();
50+
let supabase_service_role_api_key = env::var("SUPABASE_SERVICE_ROLE_API_KEY")
51+
.expect("SUPABASE_SERVICE_ROLE_API_KEY must be set");
52+
53+
// Get flow version from database
54+
let mut query = state
55+
.anything_client
56+
.from("flow_versions")
57+
.eq("flow_id", workflow_id.to_string());
58+
59+
// If version_id is provided, use it. Otherwise get published version
60+
if let Some(version) = version_id {
61+
query = query.eq("flow_version_id", version.to_string());
62+
} else {
63+
query = query.eq("published", "true");
64+
}
65+
66+
let response = query
67+
.auth(&supabase_service_role_api_key)
68+
.select("*")
69+
.single()
70+
.execute()
71+
.await
72+
.map_err(|e| {
73+
println!(
74+
"[PROCESSOR DB CALLS] Failed to execute workflow definition request: {}",
75+
e
76+
);
77+
format!("Failed to execute request: {}", e)
78+
})?;
79+
80+
let response_body = response.text().await.map_err(|e| {
81+
println!(
82+
"[PROCESSOR DB CALLS] Failed to read workflow definition response: {}",
83+
e
84+
);
85+
format!("Failed to read response body: {}", e)
86+
})?;
87+
88+
let workflow_version: DatabaseFlowVersion =
89+
serde_json::from_str(&response_body).map_err(|e| {
90+
println!("[PROCESSOR DB CALLS] No workflow version found: {}", e);
91+
String::from("No workflow version found")
92+
})?;
93+
94+
println!("[PROCESSOR DB CALLS] Successfully retrieved workflow definition");
95+
Ok(workflow_version)
96+
}
97+
98+
pub async fn get_session_tasks(
99+
state: Arc<AppState>,
100+
flow_session_id: &Uuid, //UUID
101+
) -> Result<Vec<Task>, String> {
102+
println!(
103+
"[PROCESSOR DB CALLS] Fetching tasks for flow_session_id {}",
104+
flow_session_id
105+
);
106+
107+
dotenv().ok();
108+
let supabase_service_role_api_key = env::var("SUPABASE_SERVICE_ROLE_API_KEY")
109+
.expect("SUPABASE_SERVICE_ROLE_API_KEY must be set");
110+
111+
let response = state
112+
.anything_client
113+
.from("tasks")
114+
.auth(supabase_service_role_api_key)
115+
.select("*")
116+
.eq("flow_session_id", flow_session_id.to_string())
117+
.order("processing_order.asc")
118+
.execute()
119+
.await
120+
.map_err(|e| {
121+
println!(
122+
"[PROCESSOR DB CALLS] Failed to execute session tasks request: {}",
123+
e
124+
);
125+
format!("Failed to execute request: {}", e)
126+
})?;
127+
128+
let response_body = response.text().await.map_err(|e| {
129+
println!(
130+
"[PROCESSOR DB CALLS] Failed to read session tasks response: {}",
131+
e
132+
);
133+
format!("Failed to read response body: {}", e)
134+
})?;
135+
136+
let tasks: Vec<Task> = serde_json::from_str(&response_body).map_err(|e| {
137+
println!("[PROCESSOR DB CALLS] Failed to parse tasks: {}", e);
138+
format!("Failed to parse tasks: {}", e)
139+
})?;
140+
141+
if tasks.is_empty() {
142+
println!(
143+
"[PROCESSOR DB CALLS] No tasks found for session {}",
144+
flow_session_id
145+
);
146+
return Err("No tasks found for session".to_string());
147+
}
148+
149+
println!(
150+
"[PROCESSOR DB CALLS] Successfully retrieved {} tasks",
151+
tasks.len()
152+
);
153+
Ok(tasks)
154+
}
155+
156+
pub async fn create_task(state: Arc<AppState>, task: &Task) -> Result<(), String> {
157+
println!("[PROCESSOR DB CALLS] Creating new task");
158+
dotenv().ok();
159+
let supabase_service_role_api_key = env::var("SUPABASE_SERVICE_ROLE_API_KEY")
160+
.expect("SUPABASE_SERVICE_ROLE_API_KEY must be set");
161+
162+
let response = state
163+
.anything_client
164+
.from("tasks")
165+
.auth(supabase_service_role_api_key)
166+
.insert(
167+
serde_json::to_value(task)
168+
.map_err(|e| {
169+
println!("[PROCESSOR DB CALLS] Failed to serialize task: {}", e);
170+
format!("Failed to serialize task: {}", e)
171+
})?
172+
.to_string(),
173+
)
174+
.execute()
175+
.await
176+
.map_err(|e| {
177+
println!(
178+
"[PROCESSOR DB CALLS] Failed to execute create task request: {}",
179+
e
180+
);
181+
format!("Failed to execute request: {}", e)
182+
})?;
183+
184+
let response_body = response.text().await.map_err(|e| {
185+
println!(
186+
"[PROCESSOR DB CALLS] Failed to read create task response: {}",
187+
e
188+
);
189+
format!("Failed to read response body: {}", e)
190+
})?;
191+
192+
let tasks: Vec<Task> = serde_json::from_str(&response_body).map_err(|e| {
193+
println!("[PROCESSOR DB CALLS] Failed to parse created task: {}", e);
194+
format!("Failed to parse created task: {}", e)
195+
})?;
196+
197+
let task = tasks.into_iter().next().ok_or_else(|| {
198+
println!("[PROCESSOR DB CALLS] No task was created");
199+
"No task was created".to_string()
200+
})?;
201+
202+
println!("[PROCESSOR DB CALLS] Successfully created task");
203+
Ok(())
204+
}
205+
206+
//Send just the data we need. Safer to not update every key.
207+
pub async fn update_task_status(
208+
state: Arc<AppState>,
209+
task_id: &Uuid,
210+
status: &TaskStatus,
211+
context: Option<Value>,
212+
result: Option<Value>,
213+
error: Option<Value>,
214+
started_at: Option<DateTime<Utc>>,
215+
ended_at: Option<DateTime<Utc>>,
216+
) -> Result<(), String> {
217+
println!(
218+
"[PROCESSOR DB CALLS] Updating task {} status to {}",
219+
task_id,
220+
status.as_str()
221+
);
222+
dotenv().ok();
223+
let supabase_service_role_api_key = env::var("SUPABASE_SERVICE_ROLE_API_KEY")
224+
.expect("SUPABASE_SERVICE_ROLE_API_KEY must be set");
225+
226+
//Remove sensitive headers from context
227+
let cleaned_context = if let Some(context) = context {
228+
Some(redact_headers_from_context(&context))
229+
} else {
230+
None
231+
};
232+
233+
let input = UpdateTaskInput {
234+
task_status: status.as_str().to_string(),
235+
started_at,
236+
ended_at,
237+
result,
238+
context: cleaned_context,
239+
error,
240+
};
241+
242+
state
243+
.anything_client
244+
.from("tasks")
245+
.auth(supabase_service_role_api_key)
246+
.eq("task_id", &task_id.to_string())
247+
.update(serde_json::to_string(&input).map_err(|e| {
248+
println!(
249+
"[PROCESSOR DB CALLS] Failed to serialize update input: {}",
250+
e
251+
);
252+
format!("Failed to serialize input: {}", e)
253+
})?)
254+
.execute()
255+
.await
256+
.map_err(|e| {
257+
println!(
258+
"[PROCESSOR DB CALLS] Failed to execute update task request: {}",
259+
e
260+
);
261+
format!("Failed to execute request: {}", e)
262+
})?;
263+
264+
println!("[PROCESSOR DB CALLS] Successfully updated task status");
265+
Ok(())
266+
}
267+
268+
pub async fn update_flow_session_status(
269+
state: &AppState,
270+
flow_session_id: &Uuid,
271+
flow_session_status: &FlowSessionStatus,
272+
trigger_session_status: &TriggerSessionStatus,
273+
) -> Result<(), String> {
274+
println!(
275+
"[PROCESSOR DB CALLS] Updating flow session {} status to {} and trigger status to {}",
276+
flow_session_id,
277+
flow_session_status.as_str(),
278+
trigger_session_status.as_str()
279+
);
280+
dotenv().ok();
281+
let supabase_service_role_api_key = env::var("SUPABASE_SERVICE_ROLE_API_KEY")
282+
.expect("SUPABASE_SERVICE_ROLE_API_KEY must be set");
283+
284+
let input = UpdateFlowSesssionInput {
285+
flow_session_status: flow_session_status.as_str().to_string(),
286+
trigger_session_status: trigger_session_status.as_str().to_string(),
287+
};
288+
289+
state
290+
.anything_client
291+
.from("tasks")
292+
.auth(supabase_service_role_api_key)
293+
.eq("flow_session_id", &flow_session_id.to_string())
294+
.update(serde_json::to_string(&input).map_err(|e| {
295+
println!(
296+
"[PROCESSOR DB CALLS] Failed to serialize update input: {}",
297+
e
298+
);
299+
format!("Failed to serialize input: {}", e)
300+
})?)
301+
.execute()
302+
.await
303+
.map_err(|e| {
304+
println!(
305+
"[PROCESSOR DB CALLS] Failed to execute update flow session request: {}",
306+
e
307+
);
308+
format!("Failed to execute request: {}", e)
309+
})?;
310+
311+
println!("[PROCESSOR DB CALLS] Successfully updated flow session status");
312+
Ok(())
313+
}
314+
315+
pub fn redact_headers_from_context(context: &Value) -> Value {
316+
let mut new_context = context.clone();
317+
318+
// Parse headers using parse_headers helper
319+
let headers = parse_headers(context);
320+
321+
// Create redacted headers object
322+
let redacted_headers = headers
323+
.into_iter()
324+
.map(|(key, _value)| {
325+
(
326+
key,
327+
"REDACTED_FROM_VIEWING_HERE_FOR_SECURITY_REASONS_BY_ANYTHING".to_string(),
328+
)
329+
})
330+
.collect::<Vec<_>>();
331+
332+
// Convert back to Value object
333+
let headers_obj = redacted_headers
334+
.into_iter()
335+
.map(|(k, v)| (k, Value::String(v)))
336+
.collect();
337+
338+
// Update the context with redacted headers
339+
if let Some(headers) = new_context.get_mut("headers") {
340+
*headers = Value::Object(headers_obj);
341+
}
342+
343+
new_context
344+
}

0 commit comments

Comments
 (0)