29
29
#include " elastic_client.hpp"
30
30
#include " exceptions.hpp"
31
31
#include " mappings.hpp"
32
- #include " deserializer .hpp"
32
+ #include " serializer .hpp"
33
33
#include " bulker.hpp"
34
34
#include " ThreadPool/ThreadPool.h"
35
35
@@ -126,10 +126,6 @@ class elasticsearch_plugin_impl {
126
126
bool store_transaction_traces = true ;
127
127
bool store_action_traces = true ;
128
128
129
- std::unique_ptr<elastic_client> es_client;
130
- std::unique_ptr<deserializer> abi_deserializer;
131
- std::unique_ptr<bulker_pool> bulk_pool;
132
- std::unique_ptr<ThreadPool> thread_pool;
133
129
size_t max_task_queue_size = 0 ;
134
130
int task_queue_sleep_time = 0 ;
135
131
@@ -153,6 +149,11 @@ class elasticsearch_plugin_impl {
153
149
boost::atomic<bool > startup{true };
154
150
fc::optional<chain::chain_id_type> chain_id;
155
151
152
+ std::unique_ptr<elastic_client> es_client;
153
+ std::unique_ptr<serializer> serializer;
154
+ std::unique_ptr<bulker_pool> bulk_pool;
155
+ std::unique_ptr<ThreadPool> thread_pool;
156
+
156
157
static const action_name newaccount;
157
158
static const action_name setabi;
158
159
static const action_name updateauth;
@@ -507,7 +508,7 @@ void elasticsearch_plugin_impl::upsert_account_setabi(
507
508
{
508
509
abi_def abi_def = fc::raw::unpack<chain::abi_def>( setabi.abi );
509
510
510
- abi_deserializer ->upsert_abi_cache ( setabi.account , abi_def );
511
+ serializer ->upsert_abi_cache ( setabi.account , abi_def );
511
512
512
513
param_doc (" name" , setabi.account .to_string ());
513
514
param_doc (" abi" , abi_def);
@@ -687,7 +688,7 @@ void elasticsearch_plugin_impl::_process_applied_transaction( chain::transaction
687
688
for (auto & atrace : base_action_traces) {
688
689
fc::mutable_variant_object action_traces_doc;
689
690
chain::base_action_trace &base = atrace.get ();
690
- fc::from_variant ( abi_deserializer ->to_variant_with_abi ( base ), action_traces_doc );
691
+ fc::from_variant ( serializer ->to_variant_with_abi ( base ), action_traces_doc );
691
692
692
693
fc::mutable_variant_object act_doc;
693
694
fc::from_variant ( action_traces_doc[" act" ], act_doc );
@@ -699,7 +700,7 @@ void elasticsearch_plugin_impl::_process_applied_transaction( chain::transaction
699
700
fc::mutable_variant_object action_doc;
700
701
action_doc (" _index" , action_traces_index);
701
702
action_doc (" _type" , " _doc" );
702
- action_doc (" _id" , action_traces_doc[ " receipt" ][ " global_sequence" ] );
703
+ action_doc (" _id" , base. receipt . global_sequence );
703
704
action_doc (" retry_on_conflict" , 100 );
704
705
705
706
auto action = fc::json::to_string ( fc::variant_object (" index" , action_doc) );
@@ -713,7 +714,7 @@ void elasticsearch_plugin_impl::_process_applied_transaction( chain::transaction
713
714
// transaction trace index
714
715
715
716
fc::mutable_variant_object trans_traces_doc;
716
- fc::from_variant ( abi_deserializer ->to_variant_with_abi ( *t ), trans_traces_doc );
717
+ fc::from_variant ( serializer ->to_variant_with_abi ( *t ), trans_traces_doc );
717
718
trans_traces_doc (" createAt" , now.count ());
718
719
719
720
fc::mutable_variant_object action_doc;
@@ -751,7 +752,7 @@ void elasticsearch_plugin_impl::_process_accepted_transaction( chain::transactio
751
752
auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
752
753
std::chrono::microseconds{fc::time_point::now ().time_since_epoch ().count ()} );
753
754
754
- fc::from_variant ( abi_deserializer ->to_variant_with_abi ( trx ), trans_doc );
755
+ fc::from_variant ( serializer ->to_variant_with_abi ( trx ), trans_doc );
755
756
trans_doc (" trx_id" , trx_id_str);
756
757
757
758
fc::variant signing_keys;
@@ -794,7 +795,7 @@ void elasticsearch_plugin_impl::_process_accepted_block( chain::block_state_ptr
794
795
[ bs{std::move (bs)}, this ]()
795
796
{
796
797
auto block_num = bs->block_num ;
797
- if ( block_num % 1000 == 0 )
798
+ if ( block_num % 10000 == 0 )
798
799
ilog ( " block_num: ${b}" , (" b" , block_num) );
799
800
800
801
const auto block_id = bs->id ;
@@ -856,7 +857,7 @@ void elasticsearch_plugin_impl::_process_accepted_block( chain::block_state_ptr
856
857
857
858
params_doc (" block_num" , static_cast <int32_t >(block_num));
858
859
params_doc (" block_id" , block_id_str);
859
- params_doc (" block" , abi_deserializer ->to_variant_with_abi ( *bs->block ));
860
+ params_doc (" block" , serializer ->to_variant_with_abi ( *bs->block ));
860
861
params_doc (" irreversible" , false );
861
862
params_doc (" createAt" , now.count ());
862
863
@@ -946,15 +947,14 @@ void elasticsearch_plugin_impl::_process_irreversible_block(chain::block_state_p
946
947
947
948
block_doc (" block_num" , static_cast <int32_t >(block_num));
948
949
block_doc (" block_id" , block_id_str);
949
- block_doc (" block" , abi_deserializer ->to_variant_with_abi ( *bs->block ));
950
+ block_doc (" block" , serializer ->to_variant_with_abi ( *bs->block ));
950
951
block_doc (" irreversible" , true );
951
952
block_doc (" validated" , bs->validated );
952
953
block_doc (" createAt" , now.count ());
953
954
954
955
doc (" script" , script_doc);
955
956
doc (" upsert" , block_doc);
956
957
957
-
958
958
fc::mutable_variant_object action_doc;
959
959
action_doc (" _index" , blocks_index);
960
960
action_doc (" _type" , " _doc" );
@@ -1238,7 +1238,7 @@ void elasticsearch_plugin::plugin_initialize(const variables_map& options) {
1238
1238
EOS_ASSERT (max_time > chain::config::default_abi_serializer_max_time_ms,
1239
1239
chain::plugin_config_exception, " --abi-serializer-max-time-ms required as default value not appropriate for parsing full blocks" );
1240
1240
fc::microseconds abi_serializer_max_time = app ().get_plugin <chain_plugin>().get_abi_serializer_max_time ();
1241
- my->abi_deserializer .reset ( new deserializer ( abi_serializer_max_time ));
1241
+ my->serializer .reset ( new serializer ( app (). data_dir () / " abi " , abi_serializer_max_time ));
1242
1242
}
1243
1243
1244
1244
if ( options.count ( " elastic-queue-size" )) {
0 commit comments