1
+ use crate :: config:: Settings ;
1
2
use crate :: grpc:: rpc:: * ;
2
3
use crate :: state:: global:: GlobalState ;
3
4
use crate :: test_utils:: types:: { get_token_id_by_name, prec_token_id} ;
5
+ use crate :: types:: l2:: L2BlockSerde ;
6
+ use fluidex_common:: db:: models:: { l2_block, tablenames, task} ;
7
+ use fluidex_common:: db:: DbType ;
4
8
use fluidex_common:: types:: FrExt ;
9
+ use fluidex_common:: utils:: timeutil:: FTimestamp ;
5
10
use std:: sync:: { Arc , RwLock } ;
6
- use tonic:: Status ;
11
+ use tonic:: { Code , Status } ;
7
12
8
13
pub struct Controller {
14
+ db_pool : sqlx:: Pool < DbType > ,
9
15
state : Arc < RwLock < GlobalState > > ,
10
16
}
11
17
12
18
impl Controller {
13
- pub fn new ( state : Arc < RwLock < GlobalState > > ) -> Self {
14
- Self { state }
19
+ pub async fn new ( state : Arc < RwLock < GlobalState > > ) -> Self {
20
+ let db_pool = sqlx:: postgres:: PgPool :: connect ( Settings :: db ( ) ) . await . unwrap ( ) ;
21
+ Self { db_pool, state }
22
+ }
23
+
24
+ pub async fn l2_block_query ( & self , request : L2BlockQueryRequest ) -> Result < L2BlockQueryResponse , Status > {
25
+ let block_id = request. block_id ;
26
+ let l2_block = get_l2_block_by_id ( & self . db_pool , block_id) . await ?;
27
+
28
+ let status = match get_task_status_by_block_id ( & self . db_pool , block_id) . await ? {
29
+ task:: TaskStatus :: Inited => TaskStatus :: Inited ,
30
+ task:: TaskStatus :: Witgening => TaskStatus :: Witgening ,
31
+ task:: TaskStatus :: Ready => TaskStatus :: Ready ,
32
+ task:: TaskStatus :: Assigned => TaskStatus :: Assigned ,
33
+ task:: TaskStatus :: Proved => TaskStatus :: Proved ,
34
+ } ;
35
+
36
+ let witness: L2BlockSerde = serde_json:: from_value ( l2_block. witness ) . unwrap ( ) ;
37
+ let tx_num = witness. encoded_txs . len ( ) as u64 ;
38
+ let txs = witness
39
+ . encoded_txs
40
+ . iter ( )
41
+ . map ( |tx| l2_block_query_response:: Tx {
42
+ detail : tx. iter ( ) . map ( |fr_str| fr_str. 0 . to_decimal_string ( ) ) . collect ( ) ,
43
+ } )
44
+ . collect ( ) ;
45
+
46
+ Ok ( L2BlockQueryResponse {
47
+ new_root : l2_block. new_root ,
48
+ created_time : FTimestamp :: from ( & l2_block. created_time ) . 0 ,
49
+ tx_num,
50
+ real_tx_num : tx_num, // TODO: Needs to decode and filter out NOP txs.
51
+ status : status as i32 ,
52
+ txs,
53
+ } )
15
54
}
16
55
17
56
pub fn token_balance_query ( & self , request : TokenBalanceQueryRequest ) -> Result < TokenBalanceQueryResponse , Status > {
@@ -33,3 +72,40 @@ impl Controller {
33
72
} )
34
73
}
35
74
}
75
+
76
+ async fn get_l2_block_by_id ( db_pool : & sqlx:: Pool < DbType > , block_id : i64 ) -> Result < l2_block:: L2Block , Status > {
77
+ let stmt = format ! (
78
+ "select block_id, new_root, witness, created_time
79
+ from {}
80
+ where block_id = $1
81
+ order by created_time desc limit 1" ,
82
+ tablenames:: L2_BLOCK ,
83
+ ) ;
84
+ match sqlx:: query_as :: < _ , l2_block:: L2Block > ( & stmt)
85
+ . bind ( block_id)
86
+ . fetch_one ( db_pool)
87
+ . await
88
+ {
89
+ Ok ( l2_block) => Ok ( l2_block) ,
90
+ Err ( sqlx:: Error :: RowNotFound ) => Err ( Status :: new ( Code :: NotFound , "db l2_block record not found" ) ) ,
91
+ Err ( err) => {
92
+ println ! ( "{:?}" , err) ;
93
+ Err ( Status :: new ( Code :: Internal , "db table l2_block fetch error" ) )
94
+ }
95
+ }
96
+ }
97
+
98
+ async fn get_task_status_by_block_id ( db_pool : & sqlx:: Pool < DbType > , block_id : i64 ) -> Result < task:: TaskStatus , Status > {
99
+ let stmt = format ! (
100
+ "select status
101
+ from {}
102
+ where block_id = $1
103
+ order by created_time desc limit 1" ,
104
+ tablenames:: TASK ,
105
+ ) ;
106
+ match sqlx:: query_as ( & stmt) . bind ( block_id) . fetch_one ( db_pool) . await {
107
+ Ok ( ( task_status, ) ) => Ok ( task_status) ,
108
+ Err ( sqlx:: Error :: RowNotFound ) => Err ( Status :: new ( Code :: NotFound , "db task record not found" ) ) ,
109
+ Err ( _) => Err ( Status :: new ( Code :: Internal , "db table task fetch error" ) ) ,
110
+ }
111
+ }
0 commit comments