diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..a07ac60 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "ThreadPool"] + path = ThreadPool + url = https://github.com/EOSLaoMao/ThreadPool.git diff --git a/CMakeLists.txt b/CMakeLists.txt index c0f8742..71f3cd2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,6 @@ file(GLOB HEADERS "include/eosio/elasticsearch_plugin/*.hpp") add_library( elasticsearch_plugin elasticsearch_plugin.cpp elastic_client.cpp - deserializer.cpp bulker.cpp ${HEADERS} ) diff --git a/README.md b/README.md index ac78f43..c3aad91 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,17 @@ Nodeos plugin for archiving blockchain data into Elasticsearch, inspired by [mon **Currently the plugin only work with [official eosio repository](https://github.com/EOSIO/eos).** +## Getting Help +- For questions about this project: + - [Join our Telegram group](https://t.me/eosesplugin) + - [Open an issue](https://github.com/EOSLaoMao/elasticsearch_plugin/issues/new) + +## Indices + +It is recommended to use other tools for indices management. Checkout [EOSLaoMao/elasticsearch-node](https://github.com/EOSLaoMao/elasticsearch-node). + +[Document examples](#document-examples) + ## Benchmark Detail: [Benchmark](./benchmark/benchmark.md) @@ -12,14 +23,14 @@ Detail: [Benchmark](./benchmark/benchmark.md) | | elapse(s) | speed(b/s) | | -------------------- |:---------:|:----------:| -| elasticsearch_plugin | 548 | 18.25 | +| elasticsearch_plugin | 266 | 37.59 | | mongo_db_plugin | 694 | 14.41 | ### Replay 100000 Block | | elapse(s) | speed(b/s) | | -------------------- |:---------:|:----------:| -| elasticsearch_plugin | 663 | 150.83 | +| elasticsearch_plugin | 354 | 282.49 | | mongo_db_plugin | 987 | 101.32 | ## Performance Tuning @@ -37,10 +48,8 @@ Example filters: In the benchmark, `elasticsearch_plugin` is running with default config. For production deploy, you can tweak some config. ```text - --elastic-abi-cache-size arg (=2048) The maximum size of the abi cache for serializing data. - --elastic-thread-pool-size arg (=4) The maximum size of the abi cache for. - --elastic-bulker-pool-size arg (=2) The size of the data processing thread. - --elastic-bulk-size arg (=5) The size(megabytes) of the each bulk request. + --elastic-thread-pool-size arg (=4) The size of the data processing thread pool. + --elastic-bulk-size-mb arg (=5) The size(megabytes) of the each bulk request. ``` ## Installation @@ -67,6 +76,8 @@ sudo cp "lib/libcpr.so" "/usr/local/lib/libcpr.so" ```bash git clone https://github.com/EOSLaoMao/elasticsearch_plugin.git plugins/elasticsearch_plugin +cd plugins/elasticsearch_plugin +git submodule update --init --recursive ``` 2. Add subdirectory to `plugins/CMakeLists.txt`. @@ -95,56 +106,300 @@ target_link_libraries( ${NODE_EXECUTABLE_NAME} ## Usage -The usage of `elasticsearch_plugin` is similar to [mongo_db_plugin](https://github.com/EOSIO/eos/tree/master/plugins/mongo_db_plugin). +The usage of `elasticsearch_plugin` is similar to [mongo_db_plugin](https://github.com/EOSIO/eos/tree/master/plugins/mongo_db_plugin). It is recommended that a large `--abi-serializer-max-time-ms` value be passed into the nodeos running the elasticsearch_plugin as the default abi serializer time limit is not large enough to serialize large blocks. ```plain -Config Options for eosio::elasticsearch_plugin: - -q [ --elastic-queue-size ] arg (=1024) The target queue size between nodeos - and elasticsearch plugin thread. - --elastic-abi-cache-size arg (=2048) The maximum size of the abi cache for - serializing data. - --elastic-thread-pool-size arg (=4) The size of the data processing thread - pool. - --elastic-bulker-pool-size arg (=2) The size of the elasticsearch bulker - pool. - --elastic-bulk-size arg (=5) The size(megabytes) of the each bulk - request. - --elastic-index-wipe Required with --replay-blockchain, - --hard-replay-blockchain, or - --delete-all-blocks to delete - elasticsearch index.This option - required to prevent accidental wipe of - index. - --elastic-block-start arg (=0) If specified then only abi data pushed - to elasticsearch until specified block - is reached. - -u [ --elastic-url ] arg elasticsearch URL connection string If - not specified then plugin is disabled. - --elastic-user arg elasticsearch user. - --elastic-password arg elasticsearch password. - --elastic-store-blocks arg (=1) Enables storing blocks in - elasticsearch. - --elastic-store-block-states arg (=1) Enables storing block state in - elasticsearch. - --elastic-store-transactions arg (=1) Enables storing transactions in - elasticsearch. - --elastic-store-transaction-traces arg (=1) Enables storing transaction traces in - elasticsearch. - --elastic-store-action-traces arg (=1) Enables storing action traces in - elasticsearch. - --elastic-filter-on arg Track actions which match - receiver:action:actor. Receiver, - Action, & Actor may be blank to include - all. i.e. eosio:: or :transfer: Use * - or leave unspecified to include all. - --elastic-filter-out arg Do not track actions which match - receiver:action:actor. Receiver, - Action, & Actor may be blank to exclude - all. +Config Options for eosio::elasticsearch_plugin. + -q [ --elastic-queue-size ] arg (=1024) The target queue size between nodeos + and elasticsearch plugin thread. + --elastic-thread-pool-size arg (=4) The size of the data processing thread + pool. + --elastic-bulk-size-mb arg (=5) The size(megabytes) of the each bulk + request. + --elastic-abi-db-size-mb arg (=1024) Maximum size(megabytes) of the abi + database. + --elastic-block-start arg (=0) If specified then only abi data pushed + to elasticsearch until specified block + is reached. + -u [ --elastic-url ] arg elasticsearch URL connection string If + not specified then plugin is disabled. + --elastic-user arg elasticsearch user. + --elastic-password arg elasticsearch password. + --elastic-store-blocks arg (=1) Enables storing blocks in + elasticsearch. + --elastic-store-block-states arg (=1) Enables storing block state in + elasticsearch. + --elastic-store-transactions arg (=1) Enables storing transactions in + elasticsearch. + --elastic-store-transaction-traces arg (=1) Enables storing transaction traces in + elasticsearch. + --elastic-store-action-traces arg (=1) Enables storing action traces in + elasticsearch. + --elastic-filter-on arg Track actions which match + receiver:action:actor. Receiver, + Action, & Actor may be blank to include + all. i.e. eosio:: or :transfer: Use * + or leave unspecified to include all. + --elastic-filter-out arg Do not track actions which match + receiver:action:actor. Receiver, + Action, & Actor may be blank to exclude + all. + --elastic-index-accounts arg (=accounts) elasticsearch accounts index name. + --elastic-index-blocks arg (=blocks) elasticsearch blocks index name. + --elastic-index-transactions arg (=transactions) elasticsearch transactions index name. + --elastic-index-block-states arg (=block_states) elasticsearch block_states index name. + --elastic-index-transaction-traces arg (=transaction_traces) elasticsearch transaction_traces index name. + --elastic-index-action-traces arg (=action_traces) elasticsearch action_traces index name. + ``` ## TODO -- [x] Imporve filer-on and filer-out feature, see: [https://github.com/EOSIO/eos/pull/5670](https://github.com/EOSIO/eos/pull/5670) - [ ] Due to `libcurl` [100-continue feature](https://curl.haxx.se/mail/lib-2017-07/0013.html), consider replace [EOSLaoMao/elasticlient](https://github.com/EOSLaoMao/elasticlient) with other simple http client like [https://cpp-netlib.org/#](https://cpp-netlib.org/#) -- [ ] Improve multi-thread efficiency. \ No newline at end of file + + +## Document examples + +* accounts +``` +{ + "creator": "eosio", + "pub_keys": [ + { + "permission": "owner", + "key": "EOS5Ga8VeykSY7SXJyHbnanSPHPcQ3LmKDtJjJBJHokgYDxeokP4R" + }, + { + "permission": "active", + "key": "EOS5Ga8VeykSY7SXJyHbnanSPHPcQ3LmKDtJjJBJHokgYDxeokP4R" + } + ], + "account_create_time": "2018-06-09T12:01:56.500", + "account_controls": [], + "name": "heztcnjtguge" +} +``` + +* action_traces +``` +{ + "receipt": { + "receiver": "eosio.token", + "act_digest": "94753d3277e8759aaa7e4ee8f19cfa36013180e0d66d62a4a2196d76786d574a", + "global_sequence": 1730634, + "recv_sequence": 459727, + "auth_sequence": [ + [ + "eosio", + 1730627 + ] + ], + "code_sequence": 1, + "abi_sequence": 1 + }, + "act": { + "account": "eosio.token", + "name": "transfer", + "authorization": [ + { + "actor": "eosio", + "permission": "active" + } + ], + "data": "{\"from\":\"eosio\",\"to\":\"eosio.ram\",\"quantity\":\"0.1219 EOS\",\"memo\":\"buy ram\"}", + "hex_data": "0000000000ea3055000090e602ea3055c30400000000000004454f5300000000076275792072616d" + }, + "context_free": false, + "elapsed": 80, + "console": "", + "trx_id": "98a9839544b7f4ae6ebc5fa1f9f17bd31027c4d845b7af9a6fa8cc8f9fafb0f1", + "block_num": 6643, + "block_time": "2018-06-09T12:53:42.500", + "producer_block_id": "000019f31e63bace5345f703d5140daeada8453abec7d5ebeb2d735b859e8e07", + "account_ram_deltas": [], + "except": null +} +``` + +* transaction_traces +``` +{ + "id": "711c6dab0e1b1a90aff0c54c3aed64f870a77ad35fd0aa6a19c6542c55653fd3", + "block_num": 7167, + "block_time": "2018-06-09T12:58:06.000", + "producer_block_id": "00001bff0cc43235d99514029440013e2fb7b6bd5eb6fb5c848954a088a9e0f9", + "receipt": { + "status": "executed", + "cpu_usage_us": 129052, + "net_usage_words": 1298 + }, + "elapsed": 44456, + "net_usage": 10384, + "scheduled": false, + "action_traces": [ + { + "receipt": { + "receiver": "eosio", + "act_digest": "6556d890f6238c244020411322d6a40b5c03519e3da17dc05f525387ae850914", + "global_sequence": 1874256, + "recv_sequence": 878486, + "auth_sequence": [ + [ + "eosio", + 1874249 + ] + ], + "code_sequence": 2, + "abi_sequence": 2 + }, + "act": { + "account": "eosio", + "name": "newaccount", + "authorization": [ + { + "actor": "eosio", + "permission": "active" + } + ], + "data": { + "creator": "eosio", + "name": "gi4tcnzwhege", + "owner": { + "threshold": 1, + "keys": [ + { + "key": "EOS8M7Qbuq2U5PBph9QhBm5o7sHzuxCgMewwWnmLk9be2WshZEXZB", + "weight": 1 + } + ], + "accounts": [], + "waits": [] + }, + "active": { + "threshold": 1, + "keys": [ + { + "key": "EOS8M7Qbuq2U5PBph9QhBm5o7sHzuxCgMewwWnmLk9be2WshZEXZB", + "weight": 1 + } + ], + "accounts": [], + "waits": [] + } + }, + "hex_data": "0000000000ea3055a0986afc4f94896301000000010003c7893323d9b3bd2ad4aab6708177e17a7254fdd28cc5700a54fbe3d5869e424d0100000001000000010003c7893323d9b3bd2ad4aab6708177e17a7254fdd28cc5700a54fbe3d5869e424d01000000" + }, + "context_free": false, + "elapsed": 184, + "console": "", + "trx_id": "711c6dab0e1b1a90aff0c54c3aed64f870a77ad35fd0aa6a19c6542c55653fd3", + "block_num": 7167, + "block_time": "2018-06-09T12:58:06.000", + "producer_block_id": "00001bff0cc43235d99514029440013e2fb7b6bd5eb6fb5c848954a088a9e0f9", + "account_ram_deltas": [ + { + "account": "gi4tcnzwhege", + "delta": 2996 + } + ], + "except": null, + "inline_traces": [] + } + ], + "except": null +} +``` + +* blocks +``` +{ + "timestamp": "2018-06-09T12:03:37.500", + "producer": "eosio", + "confirmed": 0, + "previous": "00000288461e6ef331cb87ebf3e8fda122dc4c1b5cfd313b080a762ce0a472df", + "transaction_mroot": "a850a72dc0fc67adce701455551c1d82246fae12055f5d10b66ba55921a0dc34", + "action_mroot": "1b79fd3dd0e544c4eacce998cf6d9a7fa9a3d4533590779b94d2be002fd52618", + "schedule_version": 0, + "new_producers": null, + "header_extensions": [], + "producer_signature": "SIG_K1_KfX4VW3aLd2KjsJCqWQpDdauT7xJwERXpckJhUG8UPNANtgqVjRZEJBGKH7DJ7ZJr9E1mmTkzqSz4KQ2FyLBpAv4KPqpxt", + "transactions": [...], + "block_extensions": [], + "irreversible": true, + "validated": true +} +``` + +* block_states +``` +{ + "active_schedule": { + "version": 1, + "producers": [ + { + "producer_name": "genesisblock", + "block_signing_key": "EOS8Yid3mE5bwWMvGGKYEDxFRGHostu5xCzFanyJP1UdgZ5mpPdwZ" + } + ] + }, + "in_current_chain": true, + "pending_schedule_lib_num": 12149, + "dpos_proposed_irreversible_blocknum": 154026, + "dpos_irreversible_blocknum": 154025, + "pending_schedule": { + "version": 1, + "producers": [] + }, + "producer_to_last_produced": [ + [ + "eosio", + 12150 + ], + [ + "genesisblock", + 154026 + ] + ], + "pending_schedule_hash": "c43882d5411af19d8596d5d835b3f4bd6a7fd36cc4c7fb55942ef11f8d1473b6", + "blockroot_merkle": { + "_active_nodes": [ + "000259a9acccbad8e577c2ec8bd87e2e956b21ebdf1524c01ea65dd464ce0fec", + "3323314d8d9f2eed9233911c01ec52a0b22ec59c5b2902ac9396080bad602a90", + "df39e691954565362fbedd6be43667b551594a7e37ad1f7e2d4f8f25c545b1e8", + "6931158aae9e5f5391ca559eb0503f1631de3ddff73785d3bf98a12418cbfe77", + "e4e2fdd9850efba91ed0f12efcecebe54b37ce9264b50722948dd23a8fb49f62", + "2930747c325841aa6926c3b8143402bb5e35c88f1a5af76cf2c9c09761be9863", + "2cde046c52b281ed55abb64a9b3da2882c19bd8100734bf9774f94b539870479", + "93eade5dac8991d23381f380f9121568c47b9e3c37317a5de3c28503aae93e53", + "827270b90af501d41051054f541ec063dfbab4e4a2eb4d6f85ffac575df777f0", + "0f8be62807a52c66552ac3ffb9406fe9b644d4c7a38fc195205c6a16c549aa3b" + ], + "_node_count": 154025 + }, + "confirm_count": [], + "confirmations": [], + "block_num": 154026, + "validated": true, + "bft_irreversible_blocknum": 0, + "header": { + "header_extensions": [], + "previous": "000259a9acccbad8e577c2ec8bd87e2e956b21ebdf1524c01ea65dd464ce0fec", + "schedule_version": 1, + "producer": "genesisblock", + "transaction_mroot": "0000000000000000000000000000000000000000000000000000000000000000", + "producer_signature": "SIG_K1_K7EFKTaKJrEeQLXSdmBoC15yva6otnavPq4LGNZcojuSYtsnLuPtZDBf7Sqgw3U2cGWjnS7Ncg5cY2KLNiUFMKYgAP9D3m", + "confirmed": 0, + "action_mroot": "18343eeab718c318d906b20cee90c8c6eeeffef04cdd514f0a22e75f24c21cf1", + "timestamp": "2018-06-11T08:21:42.000" + }, + "producer_to_last_implied_irb": [ + [ + "genesisblock", + 154025 + ] + ], + "block_signing_key": "EOS8Yid3mE5bwWMvGGKYEDxFRGHostu5xCzFanyJP1UdgZ5mpPdwZ", + "id": "000259aaeecdcdd5af5196321bc74643169962a4c20dff0cacb6225c4a7ab2c2", + "irreversible": true +} +``` diff --git a/ThreadPool b/ThreadPool new file mode 160000 index 0000000..8320d95 --- /dev/null +++ b/ThreadPool @@ -0,0 +1 @@ +Subproject commit 8320d958c298bd5c0b3fd2aaab2f8042db299ed4 diff --git a/benchmark/benchmark.md b/benchmark/benchmark.md index f8000dd..e38de93 100644 --- a/benchmark/benchmark.md +++ b/benchmark/benchmark.md @@ -2,12 +2,12 @@ | Replay 10000 Block | elapse(s) | speed(b/s) | | -------------------- |:---------:|:----------:| -| elasticsearch_plugin | 548 | 18.25 | +| elasticsearch_plugin | 266 | 37.59 | | mongo_db_plugin | 694 | 14.41 | | Replay 100000 Block | elapse(s) | speed(b/s) | | -------------------- |:---------:|:----------:| -| elasticsearch_plugin | 663 | 150.83 | +| elasticsearch_plugin | 354 | 282.49 | | mongo_db_plugin | 987 | 101.32 | ## Hardware @@ -28,16 +28,18 @@ The MongoDB and Elasticsearch are both running in the docker container with basi ### Command-line ```bash -./build/programs/nodeos/nodeos --data-dir=dev_config/data \ - --config-dir=dev_config \ +./build/programs/nodeos/nodeos \ + --data-dir=data \ + --config-dir=config \ + --abi-serializer-max-time-ms=1000000 \ --replay-blockchain \ --elastic-url=http://localhost:9200/ \ --elastic-queue-size=512 \ - --elastic-abi-cache-size=8192 \ --elastic-index-wipe -./build/programs/nodeos/nodeos --data-dir=dev_config/data \ - --config-dir=dev_config \ +./build/programs/nodeos/nodeos \ + --data-dir=data \ + --config-dir=config \ --replay-blockchain \ --mongodb-uri=mongodb://root:example@localhost:27017/eos?authSource=admin \ --mongodb-queue-size=512 \ diff --git a/bulker.cpp b/bulker.cpp index d39d7c3..20aae39 100644 --- a/bulker.cpp +++ b/bulker.cpp @@ -58,7 +58,7 @@ bulker_pool::bulker_pool(size_t size, size_t bulk_size, const std::string &user, const std::string &password): pool_size(size), bulk_size(bulk_size) { for (int i = 0; i < pool_size; ++i) { - bulker_vec.emplace_back( new bulker(bulk_size, url_list, user, password) ); + bulkers.emplace_back( new bulker(bulk_size, url_list, user, password) ); } } @@ -69,12 +69,12 @@ bulker& bulker_pool::get() { size_t cur_idx = index % pool_size; - auto ptr = bulker_vec[cur_idx].get(); + auto ptr = bulkers[cur_idx].get(); if ( ptr->size() >= bulk_size ) { cur_idx = (cur_idx + 1) % pool_size; index = cur_idx; - return *bulker_vec[cur_idx].get(); + return *bulkers[cur_idx].get(); } else { return *ptr; } diff --git a/bulker.hpp b/bulker.hpp index 29dd9bd..22a03fc 100644 --- a/bulker.hpp +++ b/bulker.hpp @@ -43,7 +43,7 @@ class bulker_pool bulker& get(); private: - std::vector> bulker_vec; + std::vector> bulkers; size_t pool_size; size_t bulk_size; std::atomic index {0}; diff --git a/deserializer.cpp b/deserializer.cpp deleted file mode 100644 index cedd9b7..0000000 --- a/deserializer.cpp +++ /dev/null @@ -1,118 +0,0 @@ -#include "deserializer.hpp" -#include "exceptions.hpp" - -#include -#include - -namespace eosio -{ - -void deserializer::purge_abi_cache() { - if( abi_cache_index.size() < abi_cache_size ) return; - - // remove the oldest (smallest) last accessed - auto& idx = abi_cache_index.get(); - auto itr = idx.begin(); - if( itr != idx.end() ) { - idx.erase( itr ); - } -} - -optional deserializer::get_abi_by_account(const account_name &name) { - fc::variant res; - try { - es_client.get("accounts", std::to_string(name.value), res); - return res["_source"]["abi"]; - } catch( elasticlient::ConnectionException& e) { - elog( "elasticsearch connection error, line ${line}, ${what}", - ( "line", __LINE__ )( "what", e.what() )); - } catch( ... ) { - // missing abi field - } - return optional(); -} - -optional deserializer::find_abi_cache(const account_name &name) { - auto itr = abi_cache_index.find( name ); - if( itr != abi_cache_index.end() ) { - abi_cache_index.modify( itr, []( auto& entry ) { - entry.last_accessed = fc::time_point::now(); - }); - - return itr->serializer; - } - return optional(); -} - -void deserializer::insert_abi_cache( const abi_cache &entry ) { - abi_cache_index.insert( entry ); -} - - -void deserializer::erase_abi_cache(const account_name &name) { - abi_cache_index.erase( name ); -} - -optional deserializer::get_abi_serializer( const account_name &name ) { - if( name.good()) { - try { - - auto abi_opt = find_abi_cache(name); - if ( abi_opt.valid() ) { - return abi_opt; - } - - auto abi_v = get_abi_by_account(name); - if( abi_v.valid() ) { - abi_def abi; - try { - abi = abi_v->as(); - } catch (...) { - ilog( "Unable to convert account abi to abi_def for ${name}", ( "name", name )); - return optional(); - } - - purge_abi_cache(); // make room if necessary - abi_cache entry; - entry.account = name; - entry.last_accessed = fc::time_point::now(); - abi_serializer abis; - if( name == chain::config::system_account_name ) { - // redefine eosio setabi.abi from bytes to abi_def - // Done so that abi is stored as abi_def in elasticsearch instead of as bytes - auto itr = std::find_if( abi.structs.begin(), abi.structs.end(), - []( const auto& s ) { return s.name == "setabi"; } ); - if( itr != abi.structs.end() ) { - auto itr2 = std::find_if( itr->fields.begin(), itr->fields.end(), - []( const auto& f ) { return f.name == "abi"; } ); - if( itr2 != itr->fields.end() ) { - if( itr2->type == "bytes" ) { - itr2->type = "abi_def"; - // unpack setabi.abi as abi_def instead of as bytes - abis.add_specialized_unpack_pack( "abi_def", - std::make_pair( - []( fc::datastream& stream, bool is_array, bool is_optional ) -> fc::variant { - EOS_ASSERT( !is_array && !is_optional, chain::elasticsearch_exception, "unexpected abi_def"); - chain::bytes temp; - fc::raw::unpack( stream, temp ); - return fc::variant( fc::raw::unpack( temp ) ); - }, - []( const fc::variant& var, fc::datastream& ds, bool is_array, bool is_optional ) { - EOS_ASSERT( false, chain::elasticsearch_exception, "never called" ); - } - ) ); - } - } - } - } - abis.set_abi( abi, abi_serializer_max_time ); - entry.serializer.emplace( std::move( abis ) ); - insert_abi_cache( entry ); - return entry.serializer; - } - } FC_CAPTURE_AND_LOG((name)) - } - return optional(); -} - -} diff --git a/deserializer.hpp b/deserializer.hpp deleted file mode 100644 index b5e1cc6..0000000 --- a/deserializer.hpp +++ /dev/null @@ -1,61 +0,0 @@ -#pragma once -#include -#include -#include - -#include "elastic_client.hpp" - -namespace eosio { - -class deserializer -{ -public: - deserializer(size_t size, fc::microseconds abi_serializer_max_time, - const std::vector url_list, - const std::string &user, const std::string &password): - abi_cache_size(size), abi_serializer_max_time(abi_serializer_max_time), - es_client(url_list, user, password) {} - - template - fc::variant to_variant_with_abi( const T& obj ) { - fc::variant pretty_output; - abi_serializer::to_variant( obj, pretty_output, - [&]( account_name n ) { return get_abi_serializer( n ); }, - abi_serializer_max_time ); - return pretty_output; - } - - void erase_abi_cache(const account_name &name); - -private: - struct by_account; - struct by_last_access; - - struct abi_cache { - account_name account; - fc::time_point last_accessed; - fc::optional serializer; - }; - - void purge_abi_cache(); - void insert_abi_cache( const abi_cache &entry ); - optional find_abi_cache(const account_name &name); - optional get_abi_by_account(const account_name &name); - optional get_abi_serializer( const account_name &name ); - - typedef boost::multi_index_container, member >, - ordered_non_unique< tag, member > - > - > abi_cache_index_t; - - size_t abi_cache_size = 0; - abi_cache_index_t abi_cache_index; - fc::microseconds abi_serializer_max_time; - - elastic_client es_client; - -}; - -} diff --git a/elastic_client.hpp b/elastic_client.hpp index d77cec4..4088209 100644 --- a/elastic_client.hpp +++ b/elastic_client.hpp @@ -11,7 +11,7 @@ class elastic_client { public: elastic_client(const std::vector url_list, const std::string &user, const std::string &password) - :client(url_list, user, password, 100000) {}; + :client(url_list, user, password, std::numeric_limits::max()) {}; void delete_index(const std::string &index_name); void init_index(const std::string &index_name, const std::string &mappings); diff --git a/elasticsearch_plugin.cpp b/elasticsearch_plugin.cpp index 96b32ec..6dc11f2 100644 --- a/elasticsearch_plugin.cpp +++ b/elasticsearch_plugin.cpp @@ -10,26 +10,24 @@ #include #include -#include -#include #include #include #include -#include -#include -#include +#include +#include #include #include #include +#include #include #include "elastic_client.hpp" #include "exceptions.hpp" -#include "mappings.hpp" -#include "deserializer.hpp" +#include "serializer.hpp" #include "bulker.hpp" +#include "ThreadPool/ThreadPool.h" namespace eosio { @@ -75,33 +73,36 @@ class elasticsearch_plugin_impl { void consume_blocks(); + void check_task_queue_size(); + void accepted_block( const chain::block_state_ptr& ); void applied_irreversible_block(const chain::block_state_ptr&); void accepted_transaction(const chain::transaction_metadata_ptr&); void applied_transaction(const chain::transaction_trace_ptr&); - void process_accepted_transaction(const chain::transaction_metadata_ptr&); - void _process_accepted_transaction(const chain::transaction_metadata_ptr&); - void process_applied_transaction(const chain::transaction_trace_ptr&); - void _process_applied_transaction(const chain::transaction_trace_ptr&); - void process_accepted_block( const chain::block_state_ptr& ); - void _process_accepted_block( const chain::block_state_ptr& ); - void process_irreversible_block(const chain::block_state_ptr&); - void _process_irreversible_block(const chain::block_state_ptr&); + + void process_applied_transaction(chain::transaction_trace_ptr); + void _process_applied_transaction(chain::transaction_trace_ptr); + void process_accepted_transaction(chain::transaction_metadata_ptr); + void _process_accepted_transaction(chain::transaction_metadata_ptr); + void process_accepted_block( chain::block_state_ptr ); + void _process_accepted_block( chain::block_state_ptr ); + void process_irreversible_block( chain::block_state_ptr ); + void _process_irreversible_block( chain::block_state_ptr ); void upsert_account( std::unordered_map> &account_upsert_actions, const chain::action& act, const chain::block_timestamp_type& block_time ); - void create_new_account( fc::mutable_variant_object& param_doc, const chain::newaccount& newacc, std::chrono::milliseconds& now, - const chain::block_timestamp_type& block_time ); - void update_account_auth( fc::mutable_variant_object& param_doc, const chain::updateauth& update, std::chrono::milliseconds& now ); - void delete_account_auth( fc::mutable_variant_object& param_doc, const chain::deleteauth& del, std::chrono::milliseconds& now ); - void upsert_account_setabi( fc::mutable_variant_object& param_doc, const chain::setabi& setabi, std::chrono::milliseconds& now ); + void create_new_account( fc::mutable_variant_object& param_doc, const chain::newaccount& newacc, const chain::block_timestamp_type& block_time ); + void update_account_auth( fc::mutable_variant_object& param_doc, const chain::updateauth& update ); + void delete_account_auth( fc::mutable_variant_object& param_doc, const chain::deleteauth& del ); + void upsert_account_setabi( fc::mutable_variant_object& param_doc, const chain::setabi& setabi ); - /// @return true if act should be added to mongodb, false to skip it - bool filter_include( const chain::action_trace& action_trace ) const; + /// @return true if act should be added to elasticsearch, false to skip it + bool filter_include( const account_name& receiver, const action_name& act_name, + const vector& authorization ) const; + bool filter_include( const transaction& trx ) const; void init(); - void delete_index(); template void queue(Queue& queue, const Entry& e); @@ -119,14 +120,14 @@ class elasticsearch_plugin_impl { bool store_transaction_traces = true; bool store_action_traces = true; - std::unique_ptr es_client; - std::unique_ptr abi_deserializer; - std::unique_ptr bulk_pool; - std::unique_ptr thr_pool; + size_t max_task_queue_size = 0; + int task_queue_sleep_time = 0; + + std::queue> upsert_account_task_queue; + std::mutex upsert_account_task_mtx; size_t max_queue_size = 0; int queue_sleep_time = 0; - size_t abi_cache_size = 0; std::deque transaction_metadata_queue; std::deque transaction_metadata_process_queue; std::deque transaction_trace_queue; @@ -135,13 +136,17 @@ class elasticsearch_plugin_impl { std::deque block_state_process_queue; std::deque irreversible_block_state_queue; std::deque irreversible_block_state_process_queue; - boost::mutex mtx; - boost::condition_variable condition; - boost::thread consume_thread; - boost::atomic done{false}; - boost::atomic startup{true}; + std::mutex mtx; + std::condition_variable condition; + std::thread consume_thread; + std::atomic done{false}; + std::atomic startup{true}; fc::optional chain_id; - fc::microseconds abi_serializer_max_time; + + std::unique_ptr es_client; + std::unique_ptr serializer; + std::unique_ptr bulk_pool; + std::unique_ptr thread_pool; static const action_name newaccount; static const action_name setabi; @@ -150,12 +155,13 @@ class elasticsearch_plugin_impl { static const permission_name owner; static const permission_name active; - static const std::string accounts_index; - static const std::string blocks_index; - static const std::string trans_index; - static const std::string block_states_index; - static const std::string trans_traces_index; - static const std::string action_traces_index; + std::string accounts_index = "accounts"; + std::string blocks_index = "blocks"; + std::string trans_index = "transactions"; + std::string block_states_index = "block_states"; + std::string trans_traces_index = "transaction_traces"; + std::string action_traces_index = "action_traces"; + }; const action_name elasticsearch_plugin_impl::newaccount = chain::newaccount::get_name(); @@ -165,27 +171,23 @@ const action_name elasticsearch_plugin_impl::deleteauth = chain::deleteauth::get const permission_name elasticsearch_plugin_impl::owner = chain::config::owner_name; const permission_name elasticsearch_plugin_impl::active = chain::config::active_name; -const std::string elasticsearch_plugin_impl::accounts_index = "accounts"; -const std::string elasticsearch_plugin_impl::blocks_index = "blocks"; -const std::string elasticsearch_plugin_impl::trans_index = "transactions"; -const std::string elasticsearch_plugin_impl::block_states_index = "block_states"; -const std::string elasticsearch_plugin_impl::trans_traces_index = "transaction_traces"; -const std::string elasticsearch_plugin_impl::action_traces_index = "action_traces"; -bool elasticsearch_plugin_impl::filter_include( const chain::action_trace& action_trace ) const { +bool elasticsearch_plugin_impl::filter_include( const account_name& receiver, const action_name& act_name, + const vector& authorization ) const +{ bool include = false; if( filter_on_star ) { include = true; } else { - auto itr = std::find_if( filter_on.cbegin(), filter_on.cend(), [&action_trace]( const auto& filter ) { - return filter.match( action_trace.receipt.receiver, action_trace.act.name, 0 ); + auto itr = std::find_if( filter_on.cbegin(), filter_on.cend(), [&receiver, &act_name]( const auto& filter ) { + return filter.match( receiver, act_name, 0 ); } ); if( itr != filter_on.cend() ) { include = true; } else { - for( const auto& a : action_trace.act.authorization ) { - auto itr = std::find_if( filter_on.cbegin(), filter_on.cend(), [&action_trace, &a]( const auto& filter ) { - return filter.match( action_trace.receipt.receiver, action_trace.act.name, a.actor ); + for( const auto& a : authorization ) { + auto itr = std::find_if( filter_on.cbegin(), filter_on.cend(), [&receiver, &act_name, &a]( const auto& filter ) { + return filter.match( receiver, act_name, a.actor ); } ); if( itr != filter_on.cend() ) { include = true; @@ -196,15 +198,16 @@ bool elasticsearch_plugin_impl::filter_include( const chain::action_trace& actio } if( !include ) { return false; } + if( filter_out.empty() ) { return true; } - auto itr = std::find_if( filter_out.cbegin(), filter_out.cend(), [&action_trace]( const auto& filter ) { - return filter.match( action_trace.receipt.receiver, action_trace.act.name, 0 ); + auto itr = std::find_if( filter_out.cbegin(), filter_out.cend(), [&receiver, &act_name]( const auto& filter ) { + return filter.match( receiver, act_name, 0 ); } ); if( itr != filter_out.cend() ) { return false; } - for( const auto& a : action_trace.act.authorization ) { - auto itr = std::find_if( filter_out.cbegin(), filter_out.cend(), [&action_trace, &a]( const auto& filter ) { - return filter.match( action_trace.receipt.receiver, action_trace.act.name, a.actor ); + for( const auto& a : authorization ) { + auto itr = std::find_if( filter_out.cbegin(), filter_out.cend(), [&receiver, &act_name, &a]( const auto& filter ) { + return filter.match( receiver, act_name, a.actor ); } ); if( itr != filter_out.cend() ) { return false; } } @@ -212,6 +215,29 @@ bool elasticsearch_plugin_impl::filter_include( const chain::action_trace& actio return true; } +bool elasticsearch_plugin_impl::filter_include( const transaction& trx ) const +{ + if( !filter_on_star || !filter_out.empty() ) { + bool include = false; + for( const auto& a : trx.actions ) { + if( filter_include( a.account, a.name, a.authorization ) ) { + include = true; + break; + } + } + if( !include ) { + for( const auto& a : trx.context_free_actions ) { + if( filter_include( a.account, a.name, a.authorization ) ) { + include = true; + break; + } + } + } + return include; + } + return true; +} + elasticsearch_plugin_impl::elasticsearch_plugin_impl() { } @@ -225,7 +251,6 @@ elasticsearch_plugin_impl::~elasticsearch_plugin_impl() condition.notify_one(); consume_thread.join(); - thr_pool->join(); } catch( std::exception& e ) { elog( "Exception on elasticsearch_plugin shutdown of consume thread: ${e}", ("e", e.what())); } @@ -234,7 +259,7 @@ elasticsearch_plugin_impl::~elasticsearch_plugin_impl() template void elasticsearch_plugin_impl::queue( Queue& queue, const Entry& e ) { - boost::mutex::scoped_lock lock( mtx ); + std::unique_lock lock( mtx ); auto queue_size = queue.size(); if( queue_size > max_queue_size ) { lock.unlock(); @@ -242,7 +267,7 @@ void elasticsearch_plugin_impl::queue( Queue& queue, const Entry& e ) { queue_sleep_time += 10; if( queue_sleep_time > 1000 ) wlog("queue size: ${q}", ("q", queue_size)); - boost::this_thread::sleep_for( boost::chrono::milliseconds( queue_sleep_time )); + std::this_thread::sleep_for( std::chrono::milliseconds( queue_sleep_time )); lock.lock(); } else { queue_sleep_time -= 10; @@ -335,10 +360,10 @@ void elasticsearch_plugin_impl::accepted_block( const chain::block_state_ptr& bs } } -void elasticsearch_plugin_impl::process_accepted_transaction( const chain::transaction_metadata_ptr& t ) { +void elasticsearch_plugin_impl::process_accepted_transaction( chain::transaction_metadata_ptr t ) { try { if( start_block_reached ) { - _process_accepted_transaction( t ); + _process_accepted_transaction( std::move(t) ); } } catch (fc::exception& e) { elog("FC Exception while processing accepted transaction metadata: ${e}", ("e", e.to_detail_string())); @@ -349,10 +374,10 @@ void elasticsearch_plugin_impl::process_accepted_transaction( const chain::trans } } -void elasticsearch_plugin_impl::process_applied_transaction( const chain::transaction_trace_ptr& t ) { +void elasticsearch_plugin_impl::process_applied_transaction( chain::transaction_trace_ptr t ) { try { // always call since we need to capture setabi on accounts even if not storing transaction traces - _process_applied_transaction( t ); + _process_applied_transaction( std::move(t) ); } catch (fc::exception& e) { elog("FC Exception while processing applied transaction trace: ${e}", ("e", e.to_detail_string())); } catch (std::exception& e) { @@ -362,10 +387,10 @@ void elasticsearch_plugin_impl::process_applied_transaction( const chain::transa } } -void elasticsearch_plugin_impl::process_irreversible_block(const chain::block_state_ptr& bs) { +void elasticsearch_plugin_impl::process_irreversible_block( chain::block_state_ptr bs) { try { if( start_block_reached ) { - _process_irreversible_block( bs ); + _process_irreversible_block( std::move(bs) ); } } catch (fc::exception& e) { elog("FC Exception while processing irreversible block: ${e}", ("e", e.to_detail_string())); @@ -376,10 +401,10 @@ void elasticsearch_plugin_impl::process_irreversible_block(const chain::block_st } } -void elasticsearch_plugin_impl::process_accepted_block( const chain::block_state_ptr& bs ) { +void elasticsearch_plugin_impl::process_accepted_block( chain::block_state_ptr bs ) { try { if( start_block_reached ) { - _process_accepted_block( bs ); + _process_accepted_block( std::move(bs) ); } } catch (fc::exception& e) { elog("FC Exception while processing accepted block trace ${e}", ("e", e.to_string())); @@ -391,7 +416,7 @@ void elasticsearch_plugin_impl::process_accepted_block( const chain::block_state } void elasticsearch_plugin_impl::create_new_account( - fc::mutable_variant_object& param_doc, const chain::newaccount& newacc, std::chrono::milliseconds& now, + fc::mutable_variant_object& param_doc, const chain::newaccount& newacc, const chain::block_timestamp_type& block_time ) { fc::variants pub_keys; @@ -400,7 +425,6 @@ void elasticsearch_plugin_impl::create_new_account( param_doc("name", newacc.name.to_string()); param_doc("creator", newacc.creator.to_string()); param_doc("account_create_time", block_time); - param_doc("createAt", now.count()); for( const auto& account : newacc.owner.accounts ) { fc::mutable_variant_object account_entry; @@ -435,7 +459,7 @@ void elasticsearch_plugin_impl::create_new_account( } void elasticsearch_plugin_impl::update_account_auth( - fc::mutable_variant_object& param_doc, const chain::updateauth& update, std::chrono::milliseconds& now ) + fc::mutable_variant_object& param_doc, const chain::updateauth& update) { fc::variants pub_keys; fc::variants account_controls; @@ -457,24 +481,23 @@ void elasticsearch_plugin_impl::update_account_auth( param_doc("permission", update.permission.to_string()); param_doc("pub_keys", pub_keys); param_doc("account_controls", account_controls); - param_doc("updateAt", now.count()); } void elasticsearch_plugin_impl::delete_account_auth( - fc::mutable_variant_object& param_doc, const chain::deleteauth& del, std::chrono::milliseconds& now ) + fc::mutable_variant_object& param_doc, const chain::deleteauth& del) { param_doc("permission", del.permission.to_string()); - param_doc("updateAt", now.count()); } void elasticsearch_plugin_impl::upsert_account_setabi( - fc::mutable_variant_object& param_doc, const chain::setabi& setabi, std::chrono::milliseconds& now ) + fc::mutable_variant_object& param_doc, const chain::setabi& setabi) { abi_def abi_def = fc::raw::unpack( setabi.abi ); + serializer->upsert_abi_cache( setabi.account, abi_def ); + param_doc("name", setabi.account.to_string()); param_doc("abi", abi_def); - param_doc("updateAt", now.count()); } void elasticsearch_plugin_impl::upsert_account( @@ -484,9 +507,6 @@ void elasticsearch_plugin_impl::upsert_account( if (act.account != chain::config::system_account_name) return; - std::chrono::milliseconds now = std::chrono::duration_cast( - std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()} ); - uint64_t account_id; std::string upsert_script; fc::mutable_variant_object param_doc; @@ -495,52 +515,46 @@ void elasticsearch_plugin_impl::upsert_account( if( act.name == newaccount ) { auto newacc = act.data_as(); - create_new_account(param_doc, newacc, now, block_time); + create_new_account(param_doc, newacc, block_time); account_id = newacc.name.value; upsert_script = "ctx._source.name = params[\"%1%\"].name;" "ctx._source.creator = params[\"%1%\"].creator;" "ctx._source.account_create_time = params[\"%1%\"].account_create_time;" "ctx._source.pub_keys = params[\"%1%\"].pub_keys;" - "ctx._source.account_controls = params[\"%1%\"].account_controls;" - "ctx._source.createAt = params[\"%1%\"].createAt;"; + "ctx._source.account_controls = params[\"%1%\"].account_controls;"; } else if( act.name == updateauth ) { const auto update = act.data_as(); - update_account_auth(param_doc, update, now); + update_account_auth(param_doc, update); account_id = update.account.value; upsert_script = "ctx._source.pub_keys.removeIf(item -> item.permission == params[\"%1%\"].permission);" "ctx._source.account_controls.removeIf(item -> item.permission == params[\"%1%\"].permission);" "ctx._source.pub_keys.addAll(params[\"%1%\"].pub_keys);" - "ctx._source.account_controls.addAll(params[\"%1%\"].account_controls);" - "ctx._source.updateAt = params[\"%1%\"].updateAt;"; + "ctx._source.account_controls.addAll(params[\"%1%\"].account_controls);"; } else if( act.name == deleteauth ) { const auto del = act.data_as(); - delete_account_auth(param_doc, del, now); + delete_account_auth(param_doc, del); account_id = del.account.value; upsert_script = "ctx._source.pub_keys.removeIf(item -> item.permission == params[\"%1%\"].permission);" - "ctx._source.account_controls.removeIf(item -> item.permission == params[\"%1%\"].permission);" - "ctx._source.updateAt = params[\"%1%\"].updateAt;"; + "ctx._source.account_controls.removeIf(item -> item.permission == params[\"%1%\"].permission);"; } else if( act.name == setabi ) { auto setabi = act.data_as(); - abi_deserializer->erase_abi_cache( setabi.account ); - - upsert_account_setabi(param_doc, setabi, now); + upsert_account_setabi(param_doc, setabi); account_id = setabi.account.value; upsert_script = "ctx._source.name = params[\"%1%\"].name;" - "ctx._source.abi = params[\"%1%\"].abi;" - "ctx._source.updateAt = params[\"%1%\"].updateAt;"; + "ctx._source.abi = params[\"%1%\"].abi;"; } - if ( !upsert_script.empty() ) { + if ( start_block_reached && !upsert_script.empty() ) { auto it = account_upsert_actions.find(account_id); if ( it != account_upsert_actions.end() ) { auto idx = std::to_string(it->second.second.size()); @@ -561,149 +575,219 @@ void elasticsearch_plugin_impl::upsert_account( } } -void elasticsearch_plugin_impl::_process_accepted_block( const chain::block_state_ptr& bs ) { +void elasticsearch_plugin_impl::_process_applied_transaction( chain::transaction_trace_ptr t ) { + + std::unordered_map> account_upsert_actions; + std::vector> base_action_traces; // without inline action traces + + bool executed = t->receipt.valid() && t->receipt->status == chain::transaction_receipt_header::executed; - auto block_num = bs->block_num; - if( block_num % 1000 == 0 ) - ilog( "block_num: ${b}", ("b", block_num) ); + std::stack> stack; + for( auto& atrace : t->action_traces ) { + stack.emplace(atrace); - const auto block_id = bs->id; - const auto block_id_str = block_id.str(); + while ( !stack.empty() ) + { + auto &atrace = stack.top().get(); + stack.pop(); - auto now = std::chrono::duration_cast( - std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()}); + if( executed && atrace.receipt.receiver == chain::config::system_account_name ) { + upsert_account( account_upsert_actions, atrace.act, atrace.block_time ); + } - if( store_block_states ) { - auto source = - "if (!ctx._source.containsKey(\"block_num\")) ctx._source.block_num = params.block_num;" - "if (!ctx._source.containsKey(\"block_id\")) ctx._source.block_id = params.block_id;" - "if (!ctx._source.containsKey(\"validated\")) ctx._source.validated = params.validated;" - "if (!ctx._source.containsKey(\"block_header_state\")) ctx._source.block_header_state = params.block_header_state;" - "ctx._source.createAt = params.createAt;"; + if( start_block_reached && filter_include( atrace.receipt.receiver, atrace.act.name, atrace.act.authorization ) ) { + base_action_traces.emplace_back( atrace ); + } - fc::mutable_variant_object doc; - fc::mutable_variant_object params_doc; - fc::mutable_variant_object script_doc; + auto &inline_traces = atrace.inline_traces; + for( auto it = inline_traces.rbegin(); it != inline_traces.rend(); ++it ) { + stack.emplace(*it); + } + } + } - params_doc("block_num", static_cast(block_num)); - params_doc("block_id", block_id_str); - params_doc("validated", bs->validated); - params_doc("block_header_state", bs); - params_doc("createAt", now.count()); + if ( !account_upsert_actions.empty() ) { - script_doc("source", source); - script_doc("lang", "painless"); - script_doc("params", params_doc); + auto f = [ account_upsert_actions{std::move(account_upsert_actions)}, this ]() + { + elasticlient::SameIndexBulkData bulk_account_upserts(accounts_index); + for( auto& action : account_upsert_actions ) { - doc("script", script_doc); - doc("scripted_upsert", true); - doc("upsert", fc::variant_object()); + fc::mutable_variant_object source_doc; + fc::mutable_variant_object script_doc; - boost::asio::post( *thr_pool, - [ doc{std::move(doc)}, block_id_str, this ]() - { - fc::mutable_variant_object action_doc; - action_doc("_index", block_states_index); - action_doc("_type", "_doc"); - action_doc("_id", block_id_str); - action_doc("retry_on_conflict", 100); + script_doc("lang", "painless"); + script_doc("source", action.second.first); + script_doc("params", action.second.second); - auto action = fc::json::to_string( fc::variant_object("update", action_doc) ); - auto json = fc::json::to_string( doc ); + source_doc("scripted_upsert", true); + source_doc("upsert", fc::variant_object()); + source_doc("script", script_doc); + + auto id = std::to_string(action.first); + auto json = fc::json::to_string(source_doc); + + bulk_account_upserts.updateDocument("_doc", id, json); + } + + try { + es_client->bulk_perform(bulk_account_upserts); + } catch( ... ) { + handle_elasticsearch_exception( "upsert accounts " + bulk_account_upserts.body(), __LINE__ ); + } + }; + + upsert_account_task_queue.emplace( std::move(f) ); + + check_task_queue_size(); + thread_pool->enqueue( + [ this ]() + { + std::unique_lock guard(upsert_account_task_mtx); + std::function task = std::move( upsert_account_task_queue.front() ); + task(); + upsert_account_task_queue.pop(); + } + ); - bulker& bulk = bulk_pool->get(); - bulk.append_document(std::move(action), std::move(json)); - }); } - if( store_blocks ) { + if( base_action_traces.empty() ) return; //< do not index transaction_trace if all action_traces filtered out + check_task_queue_size(); + thread_pool->enqueue( + [ t{std::move(t)}, base_action_traces{std::move(base_action_traces)}, this ]() + { + const auto& trx_id = t->id; + const auto trx_id_str = trx_id.str(); + if ( store_action_traces ) { + for (auto& atrace : base_action_traces) { + fc::mutable_variant_object action_traces_doc; + chain::base_action_trace &base = atrace.get(); + fc::from_variant( serializer->to_variant_with_abi( base ), action_traces_doc ); - auto source = - "if (!ctx._source.containsKey(\"block_num\")) ctx._source.block_num = params.block_num;" - "if (!ctx._source.containsKey(\"block_id\")) ctx._source.block_id = params.block_id;" - "if (!ctx._source.containsKey(\"block\")) ctx._source.block = params.block;" - "if (!ctx._source.containsKey(\"irreversible\")) ctx._source.irreversible = params.irreversible;" - "ctx._source.createAt = params.createAt;"; + fc::mutable_variant_object act_doc; + fc::from_variant( action_traces_doc["act"], act_doc ); + act_doc["data"] = fc::json::to_string( act_doc["data"] ); - fc::mutable_variant_object doc; - fc::mutable_variant_object params_doc; - fc::mutable_variant_object script_doc; + action_traces_doc["act"] = act_doc; - params_doc("block_num", static_cast(block_num)); - params_doc("block_id", block_id_str); - params_doc("block", abi_deserializer->to_variant_with_abi( *bs->block )); - params_doc("irreversible", false); - params_doc("createAt", now.count()); + fc::mutable_variant_object action_doc; + action_doc("_index", action_traces_index); + action_doc("_type", "_doc"); + action_doc("_id", base.receipt.global_sequence); + action_doc("retry_on_conflict", 100); - script_doc("source", source); - script_doc("lang", "painless"); - script_doc("params", params_doc); + auto action = fc::json::to_string( fc::variant_object("index", action_doc) ); + auto json = fc::prune_invalid_utf8( fc::json::to_string(action_traces_doc) ); - doc("script", script_doc); - doc("scripted_upsert", true); - doc("upsert", fc::variant_object()); + bulker& bulk = bulk_pool->get(); + bulk.append_document(std::move(action), std::move(json)); + } + } + + if( store_transaction_traces ) { + // transaction trace index + + fc::mutable_variant_object trans_traces_doc; + fc::from_variant( serializer->to_variant_with_abi( *t ), trans_traces_doc ); - boost::asio::post( *thr_pool, - [ doc{std::move(doc)}, block_id_str, this ]() - { fc::mutable_variant_object action_doc; - action_doc("_index", blocks_index); + action_doc("_index", trans_traces_index); action_doc("_type", "_doc"); - action_doc("_id", block_id_str); + action_doc("_id", trx_id_str); action_doc("retry_on_conflict", 100); - - auto action = fc::json::to_string( fc::variant_object("update", action_doc) ); - auto json = fc::prune_invalid_utf8( fc::json::to_string( doc ) ); + auto action = fc::json::to_string( fc::variant_object("index", action_doc) ); + auto json = fc::prune_invalid_utf8( fc::json::to_string( trans_traces_doc ) ); bulker& bulk = bulk_pool->get(); bulk.append_document(std::move(action), std::move(json)); - }); - } + + } + } + ); + } -void elasticsearch_plugin_impl::_process_irreversible_block(const chain::block_state_ptr& bs) -{ - const auto block_id = bs->block->id(); - const auto block_id_str = block_id.str(); - const auto block_num = bs->block->block_num(); +void elasticsearch_plugin_impl::_process_accepted_transaction( chain::transaction_metadata_ptr t ) { + check_task_queue_size(); + thread_pool->enqueue( + [ t{std::move(t)}, this ]() + { + const signed_transaction& trx = t->packed_trx->get_signed_transaction(); + if( !filter_include( trx ) ) return; + + const auto& trx_id = t->id; + const auto trx_id_str = trx_id.str(); - auto now = std::chrono::duration_cast( - std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()}); + fc::mutable_variant_object trans_doc; + fc::mutable_variant_object doc; - auto source = - "ctx._source.validated = params.validated;" - "ctx._source.irreversible = params.irreversible;" - "ctx._source.updateAt = params.updateAt;"; + fc::from_variant( serializer->to_variant_with_abi( trx ), trans_doc ); + trans_doc("trx_id", trx_id_str); - fc::mutable_variant_object params_doc; - fc::mutable_variant_object script_doc; + fc::variant signing_keys; + if( t->signing_keys_future.wait_for(std::chrono::seconds(0)) == std::future_status::ready ) { + signing_keys = std::get<2>(t->signing_keys_future.get()); + } else { + flat_set keys; + trx.get_signature_keys( *chain_id, fc::time_point::maximum(), keys, false ); + signing_keys = keys; + } + + if( !signing_keys.is_null() ) { + trans_doc("signing_keys", signing_keys); + } - params_doc("validated", bs->validated); - params_doc("irreversible", true); - params_doc("updateAt", now.count()); + trans_doc("accepted", t->accepted); + trans_doc("implicit", t->implicit); + trans_doc("scheduled", t->scheduled); - script_doc("source", source); - script_doc("lang", "painless"); - script_doc("params", params_doc); + doc("doc", trans_doc); + doc("doc_as_upsert", true); - if( store_block_states ) { - fc::mutable_variant_object doc; - fc::mutable_variant_object block_state_doc; + fc::mutable_variant_object action_doc; + action_doc("_index", trans_index); + action_doc("_type", "_doc"); + action_doc("_id", trx_id_str); + action_doc("retry_on_conflict", 100); - block_state_doc("block_num", static_cast(block_num)); - block_state_doc("block_id", block_id_str); - block_state_doc("block_header_state", bs); - block_state_doc("validated", bs->validated); - block_state_doc("irreversible", true); - block_state_doc("createAt", now.count()); + auto action = fc::json::to_string( fc::variant_object("update", action_doc) ); + auto json = fc::prune_invalid_utf8( fc::json::to_string( doc ) ); - doc("script", script_doc); - doc("upsert", block_state_doc); + bulker& bulk = bulk_pool->get(); + bulk.append_document(std::move(action), std::move(json)); + } + ); +} + +void elasticsearch_plugin_impl::_process_accepted_block( chain::block_state_ptr bs ) { + check_task_queue_size(); + thread_pool->enqueue( + [ bs{std::move(bs)}, this ]() + { + auto block_num = bs->block_num; + if( block_num % 10000 == 0 ) + ilog( "block_num: ${b}", ("b", block_num) ); + + const auto block_id = bs->id; + const auto block_id_str = block_id.str(); + + if( store_block_states ) { + auto source = "int v;"; // Do nothing if document already exsit. + + fc::mutable_variant_object doc; + fc::mutable_variant_object bs_doc(bs); + fc::mutable_variant_object script_doc; + + bs_doc.erase("block"); + script_doc("source", source); + script_doc("lang", "painless"); + + doc("script", script_doc); + doc("scripted_upsert", true); + doc("upsert", bs_doc); - boost::asio::post( *thr_pool, - [ doc{std::move(doc)}, block_id_str, this ]() - { fc::mutable_variant_object action_doc; action_doc("_index", block_states_index); action_doc("_type", "_doc"); @@ -711,269 +795,175 @@ void elasticsearch_plugin_impl::_process_irreversible_block(const chain::block_s action_doc("retry_on_conflict", 100); auto action = fc::json::to_string( fc::variant_object("update", action_doc) ); - auto json = fc::json::to_string( doc ); + auto json = fc::prune_invalid_utf8( fc::json::to_string( doc ) ); bulker& bulk = bulk_pool->get(); bulk.append_document(std::move(action), std::move(json)); - }); - } + } - if( store_blocks ) { - fc::mutable_variant_object doc; - fc::mutable_variant_object block_doc; + if( store_blocks ) { + auto source = "int v;"; // Do nothing if document already exsit. - block_doc("block_num", static_cast(block_num)); - block_doc("block_id", block_id_str); - block_doc("block", abi_deserializer->to_variant_with_abi( *bs->block )); - block_doc("irreversible", true); - block_doc("validated", bs->validated); - block_doc("createAt", now.count()); + fc::mutable_variant_object doc; + fc::mutable_variant_object block_doc; + fc::mutable_variant_object script_doc; - doc("script", script_doc); - doc("upsert", block_doc); + fc::from_variant(serializer->to_variant_with_abi( *bs->block ), block_doc); + + script_doc("source", source); + script_doc("lang", "painless"); + + doc("script", script_doc); + doc("scripted_upsert", true); + doc("upsert", block_doc); - boost::asio::post( *thr_pool, - [ doc{std::move(doc)}, this, block_id_str ]() - { fc::mutable_variant_object action_doc; action_doc("_index", blocks_index); action_doc("_type", "_doc"); action_doc("_id", block_id_str); action_doc("retry_on_conflict", 100); - auto action = fc::json::to_string( fc::variant_object("update", action_doc) ); auto json = fc::prune_invalid_utf8( fc::json::to_string( doc ) ); bulker& bulk = bulk_pool->get(); bulk.append_document(std::move(action), std::move(json)); - }); - } - - if( !store_transactions ) return; - - std::vector< std::pair > trans_vec; - - for( const auto& receipt : bs->block->transactions ) { - string trx_id_str; - if( receipt.trx.contains() ) { - const auto& pt = receipt.trx.get(); - // get id via get_raw_transaction() as packed_transaction.id() mutates internal transaction state - const auto& raw = pt.get_raw_transaction(); - const auto& id = fc::raw::unpack( raw ).id(); - trx_id_str = id.str(); - } else { - const auto& id = receipt.trx.get(); - trx_id_str = id.str(); + } } - - fc::mutable_variant_object trans_doc; - fc::mutable_variant_object doc; - - trans_doc("irreversible", true); - trans_doc("block_id", block_id_str); - trans_doc("block_num", static_cast(block_num)); - trans_doc("updateAt", now.count()); - - doc("doc", trans_doc); - doc("doc_as_upsert", true); - - trans_vec.emplace_back( trx_id_str, std::move(doc) ); - } - - if ( !trans_vec.empty() ) { - boost::asio::post( *thr_pool, - [ trans_vec{std::move(trans_vec)}, this ]() - { - for (auto& entry : trans_vec) { - fc::mutable_variant_object action_doc; - action_doc("_index", trans_index); - action_doc("_type", "_doc"); - action_doc("_id", entry.first); - action_doc("retry_on_conflict", 100); - - auto action = fc::json::to_string( fc::variant_object("update", action_doc) ); - auto json = fc::json::to_string( entry.second ); - - bulker& bulk = bulk_pool->get(); - bulk.append_document(std::move(action), std::move(json)); - } - }); - } - + ); } -void elasticsearch_plugin_impl::_process_accepted_transaction( const chain::transaction_metadata_ptr& t ) { - fc::mutable_variant_object trans_doc; - fc::mutable_variant_object doc; - - auto now = std::chrono::duration_cast( - std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()} ); - - const auto& trx_id = t->id; - const auto trx_id_str = trx_id.str(); - const auto& trx = t->trx; - - fc::from_variant( abi_deserializer->to_variant_with_abi( trx ), trans_doc ); - trans_doc("trx_id", trx_id_str); - - fc::variant signing_keys; - if( t->signing_keys.valid() ) { - signing_keys = t->signing_keys->second; - } else { - signing_keys = trx.get_signature_keys( *chain_id, false, false ); - } - - if( !signing_keys.is_null() ) { - trans_doc("signing_keys", signing_keys); - } - - trans_doc("accepted", t->accepted); - trans_doc("implicit", t->implicit); - trans_doc("scheduled", t->scheduled); - trans_doc("createdAt", now.count()); - - doc("doc", trans_doc); - doc("doc_as_upsert", true); - - boost::asio::post( *thr_pool, - [ doc{std::move(doc)}, trx_id_str, this ]() +void elasticsearch_plugin_impl::_process_irreversible_block(chain::block_state_ptr bs) { + check_task_queue_size(); + thread_pool->enqueue( + [ bs{std::move(bs)}, this ]() { - fc::mutable_variant_object action_doc; - action_doc("_index", trans_index); - action_doc("_type", "_doc"); - action_doc("_id", trx_id_str); - action_doc("retry_on_conflict", 100); + const auto block_id = bs->block->id(); + const auto block_id_str = block_id.str(); + const auto block_num = bs->block->block_num(); - auto action = fc::json::to_string( fc::variant_object("update", action_doc) ); - auto json = fc::prune_invalid_utf8( fc::json::to_string( doc ) ); + auto source = + "ctx._source.validated = params.validated;" + "ctx._source.irreversible = params.irreversible;"; - bulker& bulk = bulk_pool->get(); - bulk.append_document(std::move(action), std::move(json)); - }); -} + fc::mutable_variant_object params_doc; + fc::mutable_variant_object script_doc; -void elasticsearch_plugin_impl::_process_applied_transaction( const chain::transaction_trace_ptr& t ) { - auto now = std::chrono::duration_cast( - std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()}); + params_doc("validated", bs->validated); + params_doc("irreversible", true); - elasticlient::SameIndexBulkData bulk_account_upserts(accounts_index); + script_doc("source", source); + script_doc("lang", "painless"); + script_doc("params", params_doc); - std::unordered_map> account_upsert_actions; - std::vector> base_action_traces; // without inline action traces + if( store_block_states ) { + fc::mutable_variant_object doc; + fc::mutable_variant_object bs_doc(bs); - bool executed = t->receipt.valid() && t->receipt->status == chain::transaction_receipt_header::executed; + bs_doc.erase("block"); + bs_doc("irreversible", true); - std::stack> stack; - for( auto& atrace : t->action_traces ) { - stack.emplace(atrace); + doc("script", script_doc); + doc("upsert", bs_doc); - while ( !stack.empty() ) - { - auto &atrace = stack.top().get(); - stack.pop(); + fc::mutable_variant_object action_doc; + action_doc("_index", block_states_index); + action_doc("_type", "_doc"); + action_doc("_id", block_id_str); + action_doc("retry_on_conflict", 100); - if( executed && atrace.receipt.receiver == chain::config::system_account_name ) { - upsert_account( account_upsert_actions, atrace.act, atrace.block_time ); - } + auto action = fc::json::to_string( fc::variant_object("update", action_doc) ); + auto json = fc::prune_invalid_utf8( fc::json::to_string( doc ) ); - if( start_block_reached && store_action_traces && filter_include( atrace ) ) { - base_action_traces.emplace_back(atrace); + bulker& bulk = bulk_pool->get(); + bulk.append_document(std::move(action), std::move(json)); } - auto &inline_traces = atrace.inline_traces; - for( auto it = inline_traces.rbegin(); it != inline_traces.rend(); ++it ) { - stack.emplace(*it); - } - } - } + if( store_blocks ) { + fc::mutable_variant_object doc; + fc::mutable_variant_object block_doc; - for( auto& action : account_upsert_actions ) { + fc::from_variant(serializer->to_variant_with_abi( *bs->block ), block_doc); + block_doc("irreversible", true); + block_doc("validated", bs->validated); - fc::mutable_variant_object source_doc; - fc::mutable_variant_object script_doc; + doc("script", script_doc); + doc("upsert", block_doc); - script_doc("lang", "painless"); - script_doc("source", action.second.first); - script_doc("params", action.second.second); + fc::mutable_variant_object action_doc; + action_doc("_index", blocks_index); + action_doc("_type", "_doc"); + action_doc("_id", block_id_str); + action_doc("retry_on_conflict", 100); - source_doc("scripted_upsert", true); - source_doc("upsert", fc::variant_object()); - source_doc("script", script_doc); + auto action = fc::json::to_string( fc::variant_object("update", action_doc) ); + auto json = fc::prune_invalid_utf8( fc::json::to_string( doc ) ); - auto id = std::to_string(action.first); - auto json = fc::json::to_string(source_doc); + bulker& bulk = bulk_pool->get(); + bulk.append_document(std::move(action), std::move(json)); + } - bulk_account_upserts.updateDocument("_doc", id, json); - } + if( store_transactions ) { + + for( const auto& receipt : bs->block->transactions ) { + string trx_id_str; + if( receipt.trx.contains() ) { + const auto& pt = receipt.trx.get(); + // get id via get_raw_transaction() as packed_transaction.id() mutates internal transaction state + const auto& raw = pt.get_raw_transaction(); + const auto& trx = fc::raw::unpack( raw ); + if( !filter_include( trx ) ) continue; + const auto& id = trx.id(); + trx_id_str = id.str(); + } else { + const auto& id = receipt.trx.get(); + trx_id_str = id.str(); + } - if ( !bulk_account_upserts.empty() ) { - try { - es_client->bulk_perform(bulk_account_upserts); - } catch( ... ) { - handle_elasticsearch_exception( "upsert accounts " + bulk_account_upserts.body(), __LINE__ ); - } - } + fc::mutable_variant_object trans_doc; + fc::mutable_variant_object doc; - std::vector actions_vec; - for ( auto& atrace : base_action_traces) { - fc::mutable_variant_object action_traces_doc; - chain::base_action_trace &base = atrace.get(); - fc::from_variant( abi_deserializer->to_variant_with_abi( base ), action_traces_doc ); - action_traces_doc("createdAt", now.count()); - actions_vec.push_back( std::move(action_traces_doc) ); - } + trans_doc("irreversible", true); + trans_doc("block_id", block_id_str); + trans_doc("block_num", static_cast(block_num)); - bool no_actions = actions_vec.empty(); + doc("doc", trans_doc); + doc("doc_as_upsert", true); - if ( !no_actions ) { - boost::asio::post( *thr_pool, - [ actions_vec{std::move(actions_vec)}, this ]() - { - for (auto& entry : actions_vec) { fc::mutable_variant_object action_doc; - action_doc("_index", action_traces_index); + action_doc("_index", trans_index); action_doc("_type", "_doc"); + action_doc("_id", trx_id_str); action_doc("retry_on_conflict", 100); - auto action = fc::json::to_string( fc::variant_object("index", action_doc) ); - auto json = fc::prune_invalid_utf8( fc::json::to_string(entry) ); + auto action = fc::json::to_string( fc::variant_object("update", action_doc) ); + auto json = fc::prune_invalid_utf8( fc::json::to_string( doc ) ); bulker& bulk = bulk_pool->get(); bulk.append_document(std::move(action), std::move(json)); } - }); - } - - if( no_actions ) return; //< do not index transaction_trace if all action_traces filtered out - if( !start_block_reached || !store_transaction_traces ) return; - - // transaction trace index - fc::mutable_variant_object trans_traces_doc; - fc::from_variant( abi_deserializer->to_variant_with_abi( *t ), trans_traces_doc ); - trans_traces_doc("createAt", now.count()); - - boost::asio::post( *thr_pool, - [ trans_traces_doc{std::move(trans_traces_doc)}, this ]() - { - fc::mutable_variant_object action_doc; - action_doc("_index", trans_traces_index); - action_doc("_type", "_doc"); - action_doc("retry_on_conflict", 100); - - auto action = fc::json::to_string( fc::variant_object("index", action_doc) ); - auto json = fc::prune_invalid_utf8( fc::json::to_string( trans_traces_doc ) ); - - bulker& bulk = bulk_pool->get(); - bulk.append_document(std::move(action), std::move(json)); + } + } + ); +} - }); +void elasticsearch_plugin_impl::check_task_queue_size() { + auto task_queue_size = thread_pool->queue_size(); + if ( task_queue_size > max_task_queue_size ) { + task_queue_sleep_time += 10; + if( task_queue_sleep_time > 1000 ) + wlog("thread pool task queue size: ${q}", ("q", task_queue_size)); + std::this_thread::sleep_for( std::chrono::milliseconds( task_queue_sleep_time )); + } else { + task_queue_sleep_time -= 10; + if( task_queue_sleep_time < 0 ) task_queue_sleep_time = 0; + } } void elasticsearch_plugin_impl::consume_blocks() { try { while (true) { - boost::mutex::scoped_lock lock(mtx); + std::unique_lock lock(mtx); while ( transaction_metadata_queue.empty() && transaction_trace_queue.empty() && block_state_queue.empty() && @@ -1079,34 +1069,19 @@ void elasticsearch_plugin_impl::consume_blocks() { } } - -void elasticsearch_plugin_impl::delete_index() { - ilog("drop elasticsearch index"); - es_client->delete_index( accounts_index ); - es_client->delete_index( blocks_index ); - es_client->delete_index( trans_index ); - es_client->delete_index( block_states_index ); - es_client->delete_index( trans_traces_index ); - es_client->delete_index( action_traces_index ); -} - void elasticsearch_plugin_impl::init() { ilog("create elasticsearch index"); - es_client->init_index( accounts_index, accounts_mapping ); - es_client->init_index( blocks_index, blocks_mapping ); - es_client->init_index( trans_index, trans_mapping ); - es_client->init_index( block_states_index, block_states_mapping ); - es_client->init_index( trans_traces_index, trans_traces_mapping ); - es_client->init_index( action_traces_index, action_traces_mapping ); + es_client->init_index( accounts_index, ""); + es_client->init_index( blocks_index, "" ); + es_client->init_index( trans_index, "" ); + es_client->init_index( block_states_index, "" ); + es_client->init_index( trans_traces_index, "" ); + es_client->init_index( action_traces_index, "" ); if (es_client->count_doc(accounts_index) == 0) { - auto now = std::chrono::duration_cast( - std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()}); - fc::mutable_variant_object account_doc; auto acc_name = chain::config::system_account_name; account_doc("name", name( acc_name ).to_string()); - account_doc("createAt", now.count()); account_doc("pub_keys", fc::variants()); account_doc("account_controls", fc::variants()); auto json = fc::json::to_string(account_doc); @@ -1118,7 +1093,7 @@ void elasticsearch_plugin_impl::init() { } ilog("starting elasticsearch plugin thread"); - consume_thread = boost::thread([this] { consume_blocks(); }); + consume_thread = std::thread([this] { consume_blocks(); }); startup = false; } @@ -1130,17 +1105,12 @@ void elasticsearch_plugin::set_program_options(options_description&, options_des cfg.add_options() ("elastic-queue-size,q", bpo::value()->default_value(1024), "The target queue size between nodeos and elasticsearch plugin thread.") - ("elastic-abi-cache-size", bpo::value()->default_value(2048), - "The maximum size of the abi cache for serializing data.") ("elastic-thread-pool-size", bpo::value()->default_value(4), "The size of the data processing thread pool.") - ("elastic-bulker-pool-size", bpo::value()->default_value(2), - "The size of the elasticsearch bulker pool.") - ("elastic-bulk-size", bpo::value()->default_value(5), + ("elastic-bulk-size-mb", bpo::value()->default_value(5), "The size(megabytes) of the each bulk request.") - ("elastic-index-wipe", bpo::bool_switch()->default_value(false), - "Required with --replay-blockchain, --hard-replay-blockchain, or --delete-all-blocks to delete elasticsearch index." - "This option required to prevent accidental wipe of index.") + ("elastic-abi-db-size-mb", bpo::value()->default_value(1024), + "Maximum size(megabytes) of the abi database.") ("elastic-block-start", bpo::value()->default_value(0), "If specified then only abi data pushed to elasticsearch until specified block is reached.") ("elastic-url,u", bpo::value(), @@ -1163,6 +1133,18 @@ void elasticsearch_plugin::set_program_options(options_description&, options_des "Track actions which match receiver:action:actor. Receiver, Action, & Actor may be blank to include all. i.e. eosio:: or :transfer: Use * or leave unspecified to include all.") ("elastic-filter-out", bpo::value>()->composing(), "Do not track actions which match receiver:action:actor. Receiver, Action, & Actor may be blank to exclude all.") + ("elastic-index-accounts", bpo::value()->default_value("accounts"), + "elasticsearch accounts index name.") + ("elastic-index-blocks", bpo::value()->default_value("blocks"), + "elasticsearch blocks index name.") + ("elastic-index-transactions", bpo::value()->default_value("transactions"), + "elasticsearch transactions index name.") + ("elastic-index-block-states", bpo::value()->default_value("block_states"), + "elasticsearch block_states index name.") + ("elastic-index-transaction-traces", bpo::value()->default_value("transaction_traces"), + "elasticsearch transaction_traces index name.") + ("elastic-index-action-traces", bpo::value()->default_value("action_traces"), + "elasticsearch action_traces index name.") ; } @@ -1172,28 +1154,18 @@ void elasticsearch_plugin::plugin_initialize(const variables_map& options) { ilog( "initializing elasticsearch_plugin" ); my->configured = true; - if( options.at( "replay-blockchain" ).as() || options.at( "hard-replay-blockchain" ).as() || options.at( "delete-all-blocks" ).as() ) { - if( options.at( "elastic-index-wipe" ).as()) { - ilog( "Wiping elascticsearch index on startup" ); - my->delete_index_on_startup = true; - } else if( options.count( "elastic-block-start" ) == 0 ) { - EOS_ASSERT( false, chain::plugin_config_exception, "--elastic-index-wipe required with --replay-blockchain, --hard-replay-blockchain, or --delete-all-blocks" - " --elastic-index-wipe will remove EOS index from elasticsearch." ); - } + if( options.count( "abi-serializer-max-time-ms" )) { + uint32_t max_time = options.at( "abi-serializer-max-time-ms" ).as(); + EOS_ASSERT(max_time > chain::config::default_abi_serializer_max_time_ms, + chain::plugin_config_exception, "--abi-serializer-max-time-ms required as default value not appropriate for parsing full blocks"); + fc::microseconds abi_serializer_max_time = app().get_plugin().get_abi_serializer_max_time(); + auto db_size = options.at( "elastic-abi-db-size-mb" ).as(); + my->serializer.reset(new serializer(app().data_dir() / "abi", abi_serializer_max_time, db_size*1024*1024ll)); } - if( options.count( "abi-serializer-max-time-ms") == 0 ) { - EOS_ASSERT(false, chain::plugin_config_exception, "--abi-serializer-max-time-ms required as default value not appropriate for parsing full blocks"); - } - my->abi_serializer_max_time = app().get_plugin().get_abi_serializer_max_time(); - if( options.count( "elastic-queue-size" )) { my->max_queue_size = options.at( "elastic-queue-size" ).as(); } - if( options.count( "elastic-abi-cache-size" )) { - my->abi_cache_size = options.at( "elastic-abi-cache-size" ).as(); - EOS_ASSERT( my->abi_cache_size > 0, chain::plugin_config_exception, "elastic-abi-cache-size > 0 required" ); - } if( options.count( "elastic-block-start" )) { my->start_block_num = options.at( "elastic-block-start" ).as(); } @@ -1244,25 +1216,33 @@ void elasticsearch_plugin::plugin_initialize(const variables_map& options) { my->start_block_reached = true; } + my->accounts_index = options.at("elastic-index-accounts").as(); + my->blocks_index = options.at("elastic-index-blocks").as(); + my->trans_index = options.at("elastic-index-transactions").as(); + my->block_states_index = options.at("elastic-index-block-states").as(); + my->trans_traces_index = options.at("elastic-index-transaction-traces").as(); + my->action_traces_index = options.at("elastic-index-action-traces").as(); + std::string url_str = options.at( "elastic-url" ).as(); if ( url_str.back() != '/' ) url_str.push_back('/'); std::string user_str = options.at( "elastic-user" ).as(); std::string password_str = options.at( "elastic-password" ).as(); size_t thr_pool_size = options.at( "elastic-thread-pool-size" ).as(); - size_t bulk_pool_size = options.at( "elastic-bulker-pool-size" ).as(); - size_t bulk_size = options.at( "elastic-bulk-size" ).as(); + size_t bulk_size = options.at( "elastic-bulk-size-mb" ).as(); my->es_client.reset( new elastic_client(std::vector({url_str}), user_str, password_str) ); - my->abi_deserializer.reset( new deserializer( - my->abi_cache_size, my->abi_serializer_max_time, std::vector({url_str}), user_str, password_str) ); + ilog("init thread pool, size: ${tps}", ("tps", thr_pool_size)); - my->thr_pool.reset( new boost::asio::thread_pool(thr_pool_size) ); - ilog("init bulker pool, size: ${bps}, bulk size: ${bs}mb", ("bps", bulk_pool_size)("bs", bulk_size)); - my->bulk_pool.reset( new bulker_pool(bulk_pool_size, bulk_size * 1024 * 1024, std::vector({url_str}), user_str, password_str) ); + my->thread_pool.reset( new ThreadPool(thr_pool_size) ); + my->max_task_queue_size = my->max_queue_size * 8; + + ilog("bulk request size: ${bs}mb", ("bs", bulk_size)); + my->bulk_pool.reset( new bulker_pool(thr_pool_size, bulk_size * 1024 * 1024, + std::vector({url_str}), user_str, password_str) ); // hook up to signals on controller chain_plugin* chain_plug = app().find_plugin(); - EOS_ASSERT( chain_plug, chain::missing_chain_plugin_exception, "" ); + EOS_ASSERT( chain_plug, chain::missing_chain_plugin_exception, "" ); auto& chain = chain_plug->chain(); my->chain_id.emplace( chain.get_chain_id()); @@ -1282,9 +1262,7 @@ void elasticsearch_plugin::plugin_initialize(const variables_map& options) { chain.applied_transaction.connect( [&]( const chain::transaction_trace_ptr& t ) { my->applied_transaction( t ); } )); - if( my->delete_index_on_startup ) { - my->delete_index(); - } + my->init(); } else { wlog( "eosio::elasticsearch_plugin configured, but no --elastic-url specified." ); diff --git a/mappings.hpp b/mappings.hpp deleted file mode 100644 index 802e7e2..0000000 --- a/mappings.hpp +++ /dev/null @@ -1,153 +0,0 @@ -#include - -namespace eosio { - -const static std::string accounts_mapping = R"( -{ - "mappings": { - "_doc": { - "properties": { - "name": { - "type": "text" - }, - "creator": { - "type": "text" - }, - "pub_keys": { - "type": "nested" - }, - "account_controls": { - "type": "nested" - }, - "abi": { - "enabled": false - }, - "account_create_time": { - "type": "date" - }, - "createAt": { - "type": "date" - }, - "updateAt": { - "type": "date" - } - } - } - } -} -)"; - -static const std::string blocks_mapping = R"( -{ - "mappings": { - "_doc": { - "properties": { - "createAt": { - "type": "date" - }, - "updateAt": { - "type": "date" - }, - "block": { - "enabled": false - } - } - } - } -} -)"; - -static const std::string trans_mapping = R"( -{ - "mappings": { - "_doc": { - "properties": { - "createAt": { - "type": "date" - }, - "updateAt": { - "type": "date" - }, - "actions": { - "enabled": false - } - } - } - } -} -)"; - - -static const std::string block_states_mapping = R"( -{ - "mappings": { - "_doc": { - "properties": { - "createAt": { - "type": "date" - }, - "updateAt": { - "type": "date" - }, - "block_header_state": { - "enabled": false - } - } - } - } -} -)"; - -static const std::string trans_traces_mapping = R"( -{ - "mappings": { - "_doc": { - "properties": { - "createAt": { - "type": "date" - }, - "updateAt": { - "type": "date" - }, - "action_traces": { - "enabled": false - }, - "failed_dtrx_trace": { - "enabled": false - }, - "except": { - "enabled": false - } - } - } - } -} -)"; - -static const std::string action_traces_mapping = R"( -{ - "mappings": { - "_doc": { - "properties": { - "createAt": { - "type": "date" - }, - "updateAt": { - "type": "date" - }, - "receipt": { - "enabled": false - }, - "act": { - "enabled": false - }, - "account_ram_deltas": { - "enabled": false - } - } - } - } -} -)"; - -} diff --git a/serializer.hpp b/serializer.hpp new file mode 100644 index 0000000..7257867 --- /dev/null +++ b/serializer.hpp @@ -0,0 +1,134 @@ +#pragma once +#include +#include + +using namespace eosio; +using namespace chainbase; + +struct by_account; + +class abi_cache: public chainbase::object<0, abi_cache> { + OBJECT_CTOR(abi_cache,(abi)) + + id_type id; + account_name account; + eosio::chain::shared_blob abi; + + void set_abi( const eosio::chain::abi_def& a ) { + abi.resize( fc::raw::pack_size( a ) ); + fc::datastream ds( abi.data(), abi.size() ); + fc::raw::pack( ds, a ); + } +}; + + +using abi_cache_index_t = chainbase::shared_multi_index_container >, + ordered_unique< tag, member > + > +>; + +CHAINBASE_SET_INDEX_TYPE( abi_cache, abi_cache_index_t ) + + +class serializer +{ +public: + serializer(const bfs::path& dir, fc::microseconds abi_serializer_max_time, uint64_t db_size) + :db(dir, database::read_write, db_size), abi_serializer_max_time(abi_serializer_max_time) + { + db.add_index(); + } + + optional get_abi_serializer( const account_name &name ) { + if( name.good() ) { + try { + const auto& a = db.get(name); + abi_def abi; + if( abi_serializer::to_abi( a.abi, abi )) + return abi_def_to_serializer(name, abi); + } catch( std::out_of_range& e) { + // ignore missing abi exception. + } FC_CAPTURE_AND_LOG((name)) + } + return optional(); + } + + template + fc::variant to_variant_with_abi( const T& obj ) { + fc::variant pretty_output; + abi_serializer::to_variant( obj, pretty_output, + [&]( account_name n ) { return get_abi_serializer( n ); }, + abi_serializer_max_time ); + return pretty_output; + } + + void upsert_abi_cache( const account_name &name, const abi_def& abi ) { + if( name.good()) { + try { + auto* a = db.find(name); + if ( a == nullptr ) { + db.create( [&]( abi_cache& ca ) { + ca.account = name; + ca.set_abi(abi); + }); + } else { + db.modify( *a, [&]( abi_cache& ca ) { + ca.set_abi(abi); + }); + } + } FC_CAPTURE_AND_LOG((name)) + } + } + +private: + + chainbase::database db; + fc::microseconds abi_serializer_max_time; + + optional abi_def_to_serializer( const account_name &name, const abi_def& abi ) { + if( name.good()) { + try { + abi_serializer abis; + + if( name == chain::config::system_account_name ) { + // redefine eosio setabi.abi from bytes to abi_def + // Done so that abi is stored as abi_def in elasticsearch instead of as bytes + abi_def abi_new = abi; + auto itr = std::find_if( abi_new.structs.begin(), abi_new.structs.end(), + []( const auto& s ) { return s.name == "setabi"; } ); + if( itr != abi_new.structs.end() ) { + auto itr2 = std::find_if( itr->fields.begin(), itr->fields.end(), + []( const auto& f ) { return f.name == "abi"; } ); + if( itr2 != itr->fields.end() ) { + if( itr2->type == "bytes" ) { + itr2->type = "abi_def"; + // unpack setabi.abi as abi_def instead of as bytes + abis.add_specialized_unpack_pack( "abi_def", + std::make_pair( + []( fc::datastream& stream, bool is_array, bool is_optional ) -> fc::variant { + EOS_ASSERT( !is_array && !is_optional, chain::elasticsearch_exception, "unexpected abi_def"); + chain::bytes temp; + fc::raw::unpack( stream, temp ); + return fc::variant( fc::raw::unpack( temp ) ); + }, + []( const fc::variant& var, fc::datastream& ds, bool is_array, bool is_optional ) { + EOS_ASSERT( false, chain::elasticsearch_exception, "never called" ); + } + ) ); + } + } + } + abis.set_abi( abi_new, abi_serializer_max_time ); + } else { + abis.set_abi( abi, abi_serializer_max_time ); + } + + return abis; + } FC_CAPTURE_AND_LOG((name)) + } + return optional(); + } + +}; diff --git a/set-up-an-elasticsearch-cluster-stores-eos-blockchain-data.md b/set-up-an-elasticsearch-cluster-stores-eos-blockchain-data.md new file mode 100644 index 0000000..3735c19 --- /dev/null +++ b/set-up-an-elasticsearch-cluster-stores-eos-blockchain-data.md @@ -0,0 +1,126 @@ +# Set up an elasticsearch cluster stores eos blockchain data + +Several weeks ago, our team set up a elasticsearch cluster and replayed a [elasticsearch_plugin](https://github.com/EOSLaoMao/elasticsearch_plugin) added node from scratch. It took us about one week to catch up the head block of the chain. At this moment, the head block number is 36177553, the data size of cluster is around 2.0 terabytes. This is a blog that explains how we managed to do that. + +## Set up elasticsearch cluster + +When replaying a eos node with elasticsearch plugin, the bottleneck is elasticsearch indexing speed. So our goal is maximizing the indexing performance of elasticsearch cluster. Using faster hardwares like bigger ram and ssd help our cluster perform better. In addition, a proper configuration helps us spend less time and disk space. + +### Elasticsearch configuration + +Elasticsearch plugin produces several kinds of documents that correspond to data structures predefined in nodeos. We just make some modifications in last commit to remove some redundancy. You can checkout the example here: [Document examples](https://github.com/EOSLaoMao/elasticsearch_plugin/tree/feature/block-structure-change#document-examples). + +Index template is great tool for mapping and setting management. In our default template, we set `index.refresh_interval` to -1 and set `index.number_of_replicas` to 0 for better index performance. we also set index sorting and best compression for disk space saving. It is noticed that the type of `act.data` type is set to `text`, this is because we want to make vary structure action data field searchable. + +The default template of action_trace index: +```json +{ + "index_patterns" : ["action_traces*"], + "settings": { + "index": { + "number_of_shards": 3, + "refresh_interval": -1, + "number_of_replicas": 0, + "sort.field" : "block_num", + "sort.order" : "desc" + }, + "index.codec": "best_compression" + }, + "mappings": { + "_doc": { + "properties": { + "trx_id": { + "type": "keyword" + }, + "producer_block_id": { + "type": "keyword" + }, + ... + "act.data": { + "type": "text" + }, + ... + } + } + } +} +``` +More detail about templates: [elasticsearch-node/templates](https://github.com/EOSLaoMao/elasticsearch-node/tree/master/templates) + +It is important to set up index rollover. We set a cronjob that running curator rollover actions every 10 minutes. This avoid the indexing performance decrease caused by very large shards. By adjusting `conditions.max_size` in curator rollover action file and `number_of_shards` in index template, all shards are maintained in reasonable size. + +The curator action traces index rollover action: +```yml +actions: + ... + 2: + action: rollover + description: Rollover the index action_traces + options: + disable_action: False + continue_if_exception: True + name: action_traces + conditions: + max_age: 365d + max_docs: 300000000 + max_size: 100gb + ... +``` +More detail about curator actions: [elasticsearch-node/curator](https://github.com/EOSLaoMao/elasticsearch-node/tree/master/curator) + +### Elasticsearch plugin configuration + +It is recommended that the plugin be added to a node that only for data archive purpose. It is also recommended that read-only mode is used to avoid speculative execution. Forked data is still recorded (data that never becomes irreversible) but speculative transaction processing and signaling is avoided minimizing the transaction_traces/action_traces stored. + +The elasticsearch plugin is similar to mongodb plugin, action data is stored on chain as raw bytes. This plugin attempts to use associated ABI on accounts to deserialize the raw bytes into explanded abi_def form for storage into elasticsearch. Inside the plugin, ABI information is store in a [chainbase](https://github.com/EOSIO/chainbase) whose maximum size is controlled by option `--elastic-abi-db-size-mb`. Note that invalid or missing abi on a contract will result in the action data being stored as raw bytes. For example the eosio system contract does not provide abi for the onblock action so it is stored as raw bytes. + +A larger `--abi-serializer-max-time-ms` value must be passed into the nodeos running the elasticsearch_plugin as the default abi serializer time limit is not large enough to serialize large blocks. + +The plugin send bulk requests from multiple threads. Adjusting thread pool size and bulk request size to make better use of the resources of the cluster. + +There are several switches to control what kinds of documents the plugin send. In our test run, we choose not to store block data by turn option `elastic-store-block` off. + +Filter out spam account actions to lower indexing workload or just filter in the specific actions if you only interested in certain actions come from certain accounts. Note that transactions that contain only spam actions will also be filtered out. + + +Our nodeos configuration: +```ini +... +# set max serialize time to 1 second +abi-serializer-max-time-ms = 1000000 + +read-mode = read-only + +elastic-url = http://localhost:9200/ +elastic-thread-pool-size = 16 +elastic-bulk-size-mb = 20 + +# do not store block data +elastic-store-block = 0 + +elastic-filter-out = eosio:onblock: +elastic-filter-out = gu2tembqgage:: +elastic-filter-out = blocktwitter:: +elastic-filter-out = 1hello1world:: +elastic-filter-out = betdicealert:: +elastic-filter-out = myeosgateway:: +elastic-filter-out = eosonthefly1:: +elastic-filter-out = cryptohongbo:: +elastic-filter-out = experimentms:: +elastic-filter-out = eosplayaloud:: +elastic-filter-out = message.bank:: +elastic-filter-out = eospromoter1:: +elastic-filter-out = eospromotera:: +elastic-filter-out = watchdoggiee:: +elastic-filter-out = eoseosaddddd:: +elastic-filter-out = eosblackdrop:: + +# load elasticsearch plugin +plugin = eosio::elasticsearch_plugin +... +``` + +## Resources: +[Tune for Indexing Speed](https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html) +[Tune For Disk Usage](https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-disk-usage.html) +[How many shards should I have in my Elasticsearch cluster?](https://www.elastic.co/blog/how-many-shards-should-i-have-in-my-elasticsearch-cluster)