diff --git a/newrelic/config.py b/newrelic/config.py index 2a57764b5..d3dd12764 100644 --- a/newrelic/config.py +++ b/newrelic/config.py @@ -3066,15 +3066,30 @@ def _process_module_builtin_defaults(): _process_module_definition( "elasticsearch.client", "newrelic.hooks.datastore_elasticsearch", "instrument_elasticsearch_client" ) + _process_module_definition( + "elasticsearch._async.client", + "newrelic.hooks.datastore_elasticsearch", + "instrument_elasticsearch__async_client", + ) # v8 and above _process_module_definition( "elasticsearch._sync.client", "newrelic.hooks.datastore_elasticsearch", "instrument_elasticsearch_client_v8" ) + _process_module_definition( + "elasticsearch._async.client", + "newrelic.hooks.datastore_elasticsearch", + "instrument_elasticsearch__async_client_v8", + ) # v7 and below _process_module_definition( "elasticsearch.client.cat", "newrelic.hooks.datastore_elasticsearch", "instrument_elasticsearch_client_cat" ) + _process_module_definition( + "elasticsearch._async.client.cat", + "newrelic.hooks.datastore_elasticsearch", + "instrument_elasticsearch__async_client_cat", + ) # v8 and above _process_module_definition( "elasticsearch._sync.client.cat", @@ -3088,30 +3103,43 @@ def _process_module_builtin_defaults(): "newrelic.hooks.datastore_elasticsearch", "instrument_elasticsearch_client_cluster", ) + _process_module_definition( + "elasticsearch._async.client.cluster", + "newrelic.hooks.datastore_elasticsearch", + "instrument_elasticsearch__async_client_cluster", + ) # v8 and above _process_module_definition( "elasticsearch._sync.client.cluster", "newrelic.hooks.datastore_elasticsearch", "instrument_elasticsearch_client_cluster_v8", ) - # v7 and below _process_module_definition( "elasticsearch.client.indices", "newrelic.hooks.datastore_elasticsearch", "instrument_elasticsearch_client_indices", ) + _process_module_definition( + "elasticsearch._async.client.indices", + "newrelic.hooks.datastore_elasticsearch", + "instrument_elasticsearch__async_client_indices", + ) # v8 and above _process_module_definition( "elasticsearch._sync.client.indices", "newrelic.hooks.datastore_elasticsearch", "instrument_elasticsearch_client_indices_v8", ) - # v7 and below _process_module_definition( "elasticsearch.client.nodes", "newrelic.hooks.datastore_elasticsearch", "instrument_elasticsearch_client_nodes" ) + _process_module_definition( + "elasticsearch._async.client.nodes", + "newrelic.hooks.datastore_elasticsearch", + "instrument_elasticsearch__async_client_nodes", + ) # v8 and above _process_module_definition( "elasticsearch._sync.client.nodes", @@ -3125,6 +3153,11 @@ def _process_module_builtin_defaults(): "newrelic.hooks.datastore_elasticsearch", "instrument_elasticsearch_client_snapshot", ) + _process_module_definition( + "elasticsearch._async.client.snapshot", + "newrelic.hooks.datastore_elasticsearch", + "instrument_elasticsearch__async_client_snapshot", + ) # v8 and above _process_module_definition( "elasticsearch._sync.client.snapshot", @@ -3136,6 +3169,11 @@ def _process_module_builtin_defaults(): _process_module_definition( "elasticsearch.client.tasks", "newrelic.hooks.datastore_elasticsearch", "instrument_elasticsearch_client_tasks" ) + _process_module_definition( + "elasticsearch._async.client.tasks", + "newrelic.hooks.datastore_elasticsearch", + "instrument_elasticsearch__async_client_tasks", + ) # v8 and above _process_module_definition( "elasticsearch._sync.client.tasks", @@ -3149,6 +3187,11 @@ def _process_module_builtin_defaults(): "newrelic.hooks.datastore_elasticsearch", "instrument_elasticsearch_client_ingest", ) + _process_module_definition( + "elasticsearch._async.client.ingest", + "newrelic.hooks.datastore_elasticsearch", + "instrument_elasticsearch__async_client_ingest", + ) # v8 and above _process_module_definition( "elasticsearch._sync.client.ingest", @@ -3162,23 +3205,43 @@ def _process_module_builtin_defaults(): "newrelic.hooks.datastore_elasticsearch", "instrument_elasticsearch_connection_base", ) + _process_module_definition( + "elasticsearch._async.http_aiohttp", + "newrelic.hooks.datastore_elasticsearch", + "instrument_async_elasticsearch_connection_base", + ) # v8 and above _process_module_definition( "elastic_transport._node._base", "newrelic.hooks.datastore_elasticsearch", "instrument_elastic_transport__node__base", ) + _process_module_definition( + "elastic_transport._node._base_async", + "newrelic.hooks.datastore_elasticsearch", + "instrument_async_elastic_transport__node__base", + ) # v7 and below _process_module_definition( "elasticsearch.transport", "newrelic.hooks.datastore_elasticsearch", "instrument_elasticsearch_transport" ) + _process_module_definition( + "elasticsearch._async.transport", + "newrelic.hooks.datastore_elasticsearch", + "instrument_async_elasticsearch_transport", + ) # v8 and above _process_module_definition( "elastic_transport._transport", "newrelic.hooks.datastore_elasticsearch", "instrument_elastic_transport__transport", ) + _process_module_definition( + "elastic_transport._async_transport", + "newrelic.hooks.datastore_elasticsearch", + "instrument_async_elastic_transport__transport", + ) _process_module_definition("pika.adapters", "newrelic.hooks.messagebroker_pika", "instrument_pika_adapters") _process_module_definition("pika.channel", "newrelic.hooks.messagebroker_pika", "instrument_pika_channel") diff --git a/newrelic/hooks/datastore_elasticsearch.py b/newrelic/hooks/datastore_elasticsearch.py index 8ddc9277e..6f0ac69b5 100644 --- a/newrelic/hooks/datastore_elasticsearch.py +++ b/newrelic/hooks/datastore_elasticsearch.py @@ -11,8 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from newrelic.api.datastore_trace import DatastoreTrace +from newrelic.api.time_trace import current_trace from newrelic.api.transaction import current_transaction from newrelic.common.object_wrapper import function_wrapper, wrap_function_wrapper from newrelic.common.package_version_utils import get_package_version_tuple @@ -106,12 +106,19 @@ def instrument_es_methods(module, _class, client_methods, prefix=None): wrap_elasticsearch_client_method(module, _class, method_name, arg_extractor, prefix) +def instrument_async_es_methods(module, _class, client_methods, prefix=None): + for method_name, arg_extractor in client_methods: + if hasattr(getattr(module, _class), method_name): + wrap_async_elasticsearch_client_method(module, _class, method_name, arg_extractor, prefix) + + def wrap_elasticsearch_client_method(module, class_name, method_name, arg_extractor, prefix=None): def _nr_wrapper_Elasticsearch_method_(wrapped, instance, args, kwargs): transaction = current_transaction() if transaction is None: return wrapped(*args, **kwargs) + # When index is None, it means there is no target field # associated with this method. Hence this method will only # create an operation metric and no statement metric. This is @@ -127,24 +134,70 @@ def _nr_wrapper_Elasticsearch_method_(wrapped, instance, args, kwargs): else: operation = method_name - transaction._nr_datastore_instance_info = (None, None, None) + trace = DatastoreTrace(product="Elasticsearch", target=index, operation=operation, source=wrapped) - dt = DatastoreTrace(product="Elasticsearch", target=index, operation=operation, source=wrapped) - - with dt: + with trace: result = wrapped(*args, **kwargs) - instance_info = transaction._nr_datastore_instance_info - host, port_path_or_id, _ = instance_info + tracer_settings = trace.settings.datastore_tracer - dt.host = host - dt.port_path_or_id = port_path_or_id + if tracer_settings.instance_reporting.enabled: + try: + node_config = result.meta.node + trace.host = node_config.host + port = node_config.port + trace.port_path_or_id = str(port) if port is not None else None + except Exception: + pass return result wrap_function_wrapper(module, f"{class_name}.{method_name}", _nr_wrapper_Elasticsearch_method_) +def wrap_async_elasticsearch_client_method(module, class_name, method_name, arg_extractor, prefix=None): + async def _nr_wrapper_AsyncElasticsearch_method_(wrapped, instance, args, kwargs): + transaction = current_transaction() + + if transaction is None: + return await wrapped(*args, **kwargs) + + # When index is None, it means there is no target field + # associated with this method. Hence this method will only + # create an operation metric and no statement metric. This is + # handled by setting the target to None when calling the + # DatastoreTraceWrapper. + if arg_extractor is None: + index = None + else: + index = arg_extractor(*args, **kwargs) + + if prefix: + operation = f"{prefix}.{method_name}" + else: + operation = method_name + + trace = DatastoreTrace(product="Elasticsearch", target=index, operation=operation, source=wrapped) + + with trace: + result = await wrapped(*args, **kwargs) + + tracer_settings = trace.settings.datastore_tracer + + if tracer_settings.instance_reporting.enabled: + try: + node_config = result.meta.node + trace.host = node_config.host + port = node_config.port + trace.port_path_or_id = str(port) if port is not None else None + except Exception: + pass + + return result + + wrap_function_wrapper(module, f"{class_name}.{method_name}", _nr_wrapper_AsyncElasticsearch_method_) + + _elasticsearch_client_methods_below_v8 = ( ("abort_benchmark", None), ("benchmark", _extract_args_index), @@ -186,7 +239,6 @@ def _nr_wrapper_Elasticsearch_method_(wrapped, instance, args, kwargs): ("update", _extract_args_index), ) - _elasticsearch_client_methods_v8 = ( ("bulk", _extract_args_operations_index), ("clear_scroll", None), @@ -244,6 +296,16 @@ def instrument_elasticsearch_client(module): instrument_es_methods(module, "Elasticsearch", _elasticsearch_client_methods_below_v8) +def instrument_elasticsearch__async_client(module): + # The module path was remapped in v8 to match previous versions. + # In order to avoid double wrapping we check the version before + # wrapping. + if ES_VERSION < (8,): + instrument_async_es_methods(module, "AsyncElasticsearch", _elasticsearch_client_methods_below_v8) + else: + instrument_async_es_methods(module, "AsyncElasticsearch", _elasticsearch_client_methods_v8) + + def instrument_elasticsearch_client_v8(module): instrument_es_methods(module, "Elasticsearch", _elasticsearch_client_methods_v8) @@ -290,7 +352,6 @@ def instrument_elasticsearch_client_v8(module): ("validate_query", _extract_args_index), ) - _elasticsearch_client_indices_methods_v8 = ( ("add_block", _extract_args_index), ("analyze", _extract_args_index), @@ -357,6 +418,16 @@ def instrument_elasticsearch_client_indices(module): instrument_es_methods(module, "IndicesClient", _elasticsearch_client_indices_methods_below_v8, "indices") +def instrument_elasticsearch__async_client_indices(module): + # The module path was remapped in v8 to match previous versions. + # In order to avoid double wrapping we check the version before + # wrapping. + if ES_VERSION < (8,): + instrument_async_es_methods(module, "IndicesClient", _elasticsearch_client_indices_methods_below_v8, "indices") + else: + instrument_async_es_methods(module, "IndicesClient", _elasticsearch_client_indices_methods_v8, "indices") + + def instrument_elasticsearch_client_indices_v8(module): instrument_es_methods(module, "IndicesClient", _elasticsearch_client_indices_methods_v8, "indices") @@ -417,6 +488,16 @@ def instrument_elasticsearch_client_cat(module): instrument_es_methods(module, "CatClient", _elasticsearch_client_cat_methods_below_v8, "cat") +def instrument_elasticsearch__async_client_cat(module): + # The module path was remapped in v8 to match previous versions. + # In order to avoid double wrapping we check the version before + # wrapping. + if ES_VERSION < (8,): + instrument_async_es_methods(module, "CatClient", _elasticsearch_client_cat_methods_below_v8, "cat") + else: + instrument_async_es_methods(module, "CatClient", _elasticsearch_client_cat_methods_v8, "cat") + + def instrument_elasticsearch_client_cat_v8(module): instrument_es_methods(module, "CatClient", _elasticsearch_client_cat_methods_v8, "cat") @@ -431,7 +512,6 @@ def instrument_elasticsearch_client_cat_v8(module): ("stats", None), ) - _elasticsearch_client_cluster_methods_v8 = ( ("allocation_explain", _extract_args_allocation_explain_index), ("delete_component_template", None), @@ -459,6 +539,16 @@ def instrument_elasticsearch_client_cluster(module): instrument_es_methods(module, "ClusterClient", _elasticsearch_client_cluster_methods_below_v8, "cluster") +def instrument_elasticsearch__async_client_cluster(module): + # The module path was remapped in v8 to match previous versions. + # In order to avoid double wrapping we check the version before + # wrapping. + if ES_VERSION < (8,): + instrument_async_es_methods(module, "ClusterClient", _elasticsearch_client_cluster_methods_below_v8, "cluster") + else: + instrument_async_es_methods(module, "ClusterClient", _elasticsearch_client_cluster_methods_v8, "cluster") + + def instrument_elasticsearch_client_cluster_v8(module): instrument_es_methods(module, "ClusterClient", _elasticsearch_client_cluster_methods_v8, "cluster") @@ -469,6 +559,7 @@ def instrument_elasticsearch_client_cluster_v8(module): ("shutdown", None), ("stats", None), ) + _elasticsearch_client_nodes_methods_v8 = ( ("clear_repositories_metering_archive", None), ("get_repositories_metering_info", None), @@ -488,6 +579,16 @@ def instrument_elasticsearch_client_nodes(module): instrument_es_methods(module, "NodesClient", _elasticsearch_client_nodes_methods_below_v8, "nodes") +def instrument_elasticsearch__async_client_nodes(module): + # The module path was remapped in v8 to match previous versions. + # In order to avoid double wrapping we check the version before + # wrapping. + if ES_VERSION < (8,): + instrument_async_es_methods(module, "NodesClient", _elasticsearch_client_nodes_methods_below_v8, "nodes") + else: + instrument_async_es_methods(module, "NodesClient", _elasticsearch_client_nodes_methods_v8, "nodes") + + def instrument_elasticsearch_client_nodes_v8(module): instrument_es_methods(module, "NodesClient", _elasticsearch_client_nodes_methods_v8, "nodes") @@ -503,6 +604,7 @@ def instrument_elasticsearch_client_nodes_v8(module): ("status", None), ("verify_repository", None), ) + _elasticsearch_client_snapshot_methods_v8 = ( ("cleanup_repository", None), ("clone", None), @@ -526,6 +628,18 @@ def instrument_elasticsearch_client_snapshot(module): instrument_es_methods(module, "SnapshotClient", _elasticsearch_client_snapshot_methods_below_v8, "snapshot") +def instrument_elasticsearch__async_client_snapshot(module): + # The module path was remapped in v8 to match previous versions. + # In order to avoid double wrapping we check the version before + # wrapping. + if ES_VERSION < (8,): + instrument_async_es_methods( + module, "SnapshotClient", _elasticsearch_client_snapshot_methods_below_v8, "snapshot" + ) + else: + instrument_async_es_methods(module, "SnapshotClient", _elasticsearch_client_snapshot_methods_v8, "snapshot") + + def instrument_elasticsearch_client_snapshot_v8(module): instrument_es_methods(module, "SnapshotClient", _elasticsearch_client_snapshot_methods_v8, "snapshot") @@ -541,6 +655,14 @@ def instrument_elasticsearch_client_tasks(module): instrument_es_methods(module, "TasksClient", _elasticsearch_client_tasks_methods, "tasks") +def instrument_elasticsearch__async_client_tasks(module): + # The module path was remapped in v8 to match previous versions. + # In order to avoid double wrapping we check the version before + # wrapping. + if ES_VERSION < (8,): + instrument_async_es_methods(module, "TasksClient", _elasticsearch_client_tasks_methods, "tasks") + + def instrument_elasticsearch_client_tasks_v8(module): instrument_es_methods(module, "TasksClient", _elasticsearch_client_tasks_methods, "tasks") @@ -570,6 +692,16 @@ def instrument_elasticsearch_client_ingest(module): instrument_es_methods(module, "IngestClient", _elasticsearch_client_ingest_methods_below_v8, "ingest") +def instrument_elasticsearch__async_client_ingest(module): + # The module path was remapped in v8 to match previous versions. + # In order to avoid double wrapping we check the version before + # wrapping. + if ES_VERSION < (8,): + instrument_async_es_methods(module, "IngestClient", _elasticsearch_client_ingest_methods_below_v8, "ingest") + else: + instrument_async_es_methods(module, "IngestClient", _elasticsearch_client_ingest_methods_v8, "ingest") + + def instrument_elasticsearch_client_ingest_v8(module): instrument_es_methods(module, "IngestClient", _elasticsearch_client_ingest_methods_v8, "ingest") @@ -592,42 +724,74 @@ def _bind_params(host="localhost", port=9200, *args, **kwargs): return wrapped(*args, **kwargs) +def _nr__AsyncConnection__init__wrapper(wrapped, instance, args, kwargs): + """Cache datastore instance info on Connection object""" + + def _bind_params(host="localhost", port=9200, *args, **kwargs): + return host, port + + host, port = _bind_params(*args, **kwargs) + port = str(port) + instance._nr_host_port = (host, port) + + return wrapped(*args, **kwargs) + + def instrument_elasticsearch_connection_base(module): wrap_function_wrapper(module, "Connection.__init__", _nr_Connection__init__wrapper) +def instrument_async_elasticsearch_connection_base(module): + wrap_function_wrapper(module, "AsyncConnection.__init__", _nr__AsyncConnection__init__wrapper) + + def BaseNode__init__wrapper(wrapped, instance, args, kwargs): result = wrapped(*args, **kwargs) instance._nr_host_port = (instance.host, str(instance.port)) return result +def BaseAsyncNode__init__wrapper(wrapped, instance, args, kwargs): + result = wrapped(*args, **kwargs) + instance._nr_host_port = (instance.host, str(instance.port)) + return result + + def instrument_elastic_transport__node__base(module): if hasattr(module, "BaseNode"): wrap_function_wrapper(module, "BaseNode.__init__", BaseNode__init__wrapper) +def instrument_async_elastic_transport__node__base(module): + if hasattr(module, "BaseAsyncNode"): + wrap_function_wrapper(module, "BaseAsyncNode.__init__", BaseAsyncNode__init__wrapper) + + def _nr_get_connection_wrapper(wrapped, instance, args, kwargs): """Read instance info from Connection and stash on Transaction.""" - transaction = current_transaction() + # Instance info provided in request metadata for v8 + if ES_VERSION >= (8,): + return wrapped(*args, **kwargs) - if transaction is None: + trace = current_trace() + + if trace is None or not isinstance(trace, DatastoreTrace): return wrapped(*args, **kwargs) conn = wrapped(*args, **kwargs) - instance_info = (None, None, None) + host = port_path_or_id = "unknown" try: - tracer_settings = transaction.settings.datastore_tracer + tracer_settings = trace.settings.datastore_tracer if tracer_settings.instance_reporting.enabled: host, port_path_or_id = conn._nr_host_port - instance_info = (host, port_path_or_id, None) except Exception: - instance_info = ("unknown", "unknown", None) + pass - transaction._nr_datastore_instance_info = instance_info + trace.host = host + trace.port_path_or_id = port_path_or_id return conn @@ -647,11 +811,35 @@ def _nr_perform_request_wrapper(wrapped, instance, args, kwargs): return wrapped(*args, **kwargs) +async def _nr_async_perform_request_wrapper(wrapped, instance, args, kwargs): + """Read instance info from Async Connection and stash on Transaction.""" + transaction = current_transaction() + + if transaction is None: + return await wrapped(*args, **kwargs) + + if not hasattr(instance.node_pool.get, "_nr_wrapped"): + instance.node_pool.get = function_wrapper(_nr_get_connection_wrapper)(instance.node_pool.get) + instance.node_pool.get._nr_wrapped = True + + return await wrapped(*args, **kwargs) + + def instrument_elasticsearch_transport(module): if hasattr(module, "Transport") and hasattr(module.Transport, "get_connection"): wrap_function_wrapper(module, "Transport.get_connection", _nr_get_connection_wrapper) +def instrument_async_elasticsearch_transport(module): + if hasattr(module, "AsyncTransport") and hasattr(module.AsyncTransport, "get_connection"): + wrap_function_wrapper(module, "AsyncTransport.get_connection", _nr_get_connection_wrapper) + + def instrument_elastic_transport__transport(module): if hasattr(module, "Transport") and hasattr(module.Transport, "perform_request"): wrap_function_wrapper(module, "Transport.perform_request", _nr_perform_request_wrapper) + + +def instrument_async_elastic_transport__transport(module): + if hasattr(module, "AsyncTransport") and hasattr(module.AsyncTransport, "perform_request"): + wrap_function_wrapper(module, "AsyncTransport.perform_request", _nr_async_perform_request_wrapper) diff --git a/tests/datastore_elasticsearch/conftest.py b/tests/datastore_elasticsearch/conftest.py index 5e026b842..666f1f884 100644 --- a/tests/datastore_elasticsearch/conftest.py +++ b/tests/datastore_elasticsearch/conftest.py @@ -14,9 +14,10 @@ import pytest from testing_support.db_settings import elasticsearch_settings +from testing_support.fixture.event_loop import event_loop as loop from testing_support.fixtures import collector_agent_registration_fixture, collector_available_fixture -from newrelic.common.package_version_utils import get_package_version +from newrelic.common.package_version_utils import get_package_version_tuple _default_settings = { "package_reporting.enabled": False, # Turn off package reporting for testing as it causes slow downs. @@ -33,14 +34,31 @@ linked_applications=["Python Agent Test (datastore)"], ) -ES_VERSION = tuple([int(n) for n in get_package_version("elasticsearch").split(".")]) ES_SETTINGS = elasticsearch_settings()[0] ES_MULTIPLE_SETTINGS = elasticsearch_settings() ES_URL = f"http://{ES_SETTINGS['host']}:{ES_SETTINGS['port']}" +ES_VERSION = get_package_version_tuple("elasticsearch") +IS_V8_OR_ABOVE = ES_VERSION >= (8,) +IS_V7_OR_BELOW = not IS_V8_OR_ABOVE +RUN_IF_V8_OR_ABOVE = pytest.mark.skipif(not IS_V8_OR_ABOVE, reason="Unsupported for elasticsearch>=8") +RUN_IF_V7_OR_BELOW = pytest.mark.skipif(not IS_V7_OR_BELOW, reason="Unsupported for elasticsearch<=7") -@pytest.fixture(scope="session") + +@pytest.fixture(scope="function") def client(): from elasticsearch import Elasticsearch - return Elasticsearch(ES_URL) + _client = Elasticsearch(ES_URL) + yield _client + _client.close() + + +@pytest.fixture(scope="function") +def async_client(loop): + from elasticsearch import AsyncElasticsearch + + # Manual context manager + _async_client = AsyncElasticsearch(ES_URL) + yield _async_client + loop.run_until_complete(_async_client.close()) diff --git a/tests/datastore_elasticsearch/test_async_connection.py b/tests/datastore_elasticsearch/test_async_connection.py new file mode 100644 index 000000000..69eec2efb --- /dev/null +++ b/tests/datastore_elasticsearch/test_async_connection.py @@ -0,0 +1,63 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from conftest import ES_SETTINGS, ES_VERSION + +try: + # v8+ + from elastic_transport._models import NodeConfig + from elastic_transport._node._base_async import BaseAsyncNode as AsyncConnection +except ImportError: + # v7 + from elasticsearch._async.http_aiohttp import AsyncConnection + +HOST = {"scheme": "http", "host": ES_SETTINGS["host"], "port": int(ES_SETTINGS["port"])} + +IS_V8 = ES_VERSION >= (8,) +SKIP_IF_V7 = pytest.mark.skipif(not IS_V8, reason="Skipping v8 tests.") +SKIP_IF_V8 = pytest.mark.skipif(IS_V8, reason="Skipping v7 tests.") + + +def test_connection_default(): + if IS_V8: + conn = AsyncConnection(NodeConfig(**HOST)) + else: + conn = AsyncConnection(**HOST) + + assert conn._nr_host_port == (ES_SETTINGS["host"], ES_SETTINGS["port"]) + + +@SKIP_IF_V7 +def test_connection_config(): + conn = AsyncConnection(NodeConfig(scheme="http", host="foo", port=8888)) + assert conn._nr_host_port == ("foo", "8888") + + +@SKIP_IF_V8 +def test_connection_host_arg(): + conn = AsyncConnection("the_host") + assert conn._nr_host_port == ("the_host", "9200") + + +@SKIP_IF_V8 +def test_connection_args(): + conn = AsyncConnection("the_host", 9999) + assert conn._nr_host_port == ("the_host", "9999") + + +@SKIP_IF_V8 +def test_connection_kwargs(): + conn = AsyncConnection(host="foo", port=8888) + assert conn._nr_host_port == ("foo", "8888") diff --git a/tests/datastore_elasticsearch/test_async_database_duration.py b/tests/datastore_elasticsearch/test_async_database_duration.py new file mode 100644 index 000000000..5f5be8e1e --- /dev/null +++ b/tests/datastore_elasticsearch/test_async_database_duration.py @@ -0,0 +1,69 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sqlite3 + +import pytest +from conftest import ES_VERSION +from testing_support.validators.validate_database_duration import validate_database_duration + +from newrelic.api.background_task import background_task + + +async def _exercise_es_v7(es): + await es.index( + index="contacts", doc_type="person", body={"name": "Joe Tester", "age": 25, "title": "QA Engineer"}, id=1 + ) + await es.index( + index="contacts", doc_type="person", body={"name": "Jessica Coder", "age": 32, "title": "Programmer"}, id=2 + ) + await es.index( + index="contacts", doc_type="person", body={"name": "Freddy Tester", "age": 29, "title": "Assistant"}, id=3 + ) + await es.indices.refresh("contacts") + + +async def _exercise_es_v8(es): + await es.index(index="contacts", body={"name": "Joe Tester", "age": 25, "title": "QA Engineer"}, id=1) + await es.index(index="contacts", body={"name": "Jessica Coder", "age": 32, "title": "Programmer"}, id=2) + await es.index(index="contacts", body={"name": "Freddy Tester", "age": 29, "title": "Assistant"}, id=3) + await es.indices.refresh(index="contacts") + + +_exercise_es = _exercise_es_v7 if ES_VERSION < (8, 0, 0) else _exercise_es_v8 + + +@validate_database_duration() +@background_task() +def test_elasticsearch_database_duration(loop, async_client): + loop.run_until_complete(_exercise_es(async_client)) + + +@validate_database_duration() +@background_task() +def test_elasticsearch_and_sqlite_database_duration(loop, async_client): + # Make Elasticsearch queries + + loop.run_until_complete(_exercise_es(async_client)) + + # Make sqlite queries + + conn = sqlite3.connect(":memory:") + cur = conn.cursor() + + cur.execute("CREATE TABLE people (name text, age int)") + cur.execute("INSERT INTO people VALUES ('Bob', 22)") + + conn.commit() + conn.close() diff --git a/tests/datastore_elasticsearch/test_async_elasticsearch.py b/tests/datastore_elasticsearch/test_async_elasticsearch.py new file mode 100644 index 000000000..5002d71f5 --- /dev/null +++ b/tests/datastore_elasticsearch/test_async_elasticsearch.py @@ -0,0 +1,197 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from conftest import ES_SETTINGS, IS_V8_OR_ABOVE +from elasticsearch._async import client +from testing_support.fixture.event_loop import event_loop as loop +from testing_support.fixtures import override_application_settings +from testing_support.util import instance_hostname +from testing_support.validators.validate_transaction_errors import validate_transaction_errors +from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics + +from newrelic.api.background_task import background_task + +# Settings + +_enable_instance_settings = {"datastore_tracer.instance_reporting.enabled": True} +_disable_instance_settings = {"datastore_tracer.instance_reporting.enabled": False} + +# Metrics + +_base_scoped_metrics = [ + ("Datastore/operation/Elasticsearch/cat.health", 1), + ("Datastore/operation/Elasticsearch/nodes.info", 1), + ("Datastore/operation/Elasticsearch/snapshot.status", 1), + ("Datastore/statement/Elasticsearch/_all/cluster.health", 1), + ("Datastore/statement/Elasticsearch/_all/search", 2), + ("Datastore/statement/Elasticsearch/address/index", 2), + ("Datastore/statement/Elasticsearch/address/search", 1), + ("Datastore/statement/Elasticsearch/contacts/index", 3), + ("Datastore/statement/Elasticsearch/contacts/indices.refresh", 1), + ("Datastore/statement/Elasticsearch/contacts/search", 2), + ("Datastore/statement/Elasticsearch/other/search", 2), +] + +_all_count = 17 +_base_rollup_metrics = [ + ("Datastore/all", _all_count), + ("Datastore/allOther", _all_count), + ("Datastore/Elasticsearch/all", _all_count), + ("Datastore/Elasticsearch/allOther", _all_count), + ("Datastore/operation/Elasticsearch/cat.health", 1), + ("Datastore/operation/Elasticsearch/cluster.health", 1), + ("Datastore/operation/Elasticsearch/index", 5), + ("Datastore/operation/Elasticsearch/indices.refresh", 1), + ("Datastore/operation/Elasticsearch/nodes.info", 1), + ("Datastore/operation/Elasticsearch/search", 7), + ("Datastore/operation/Elasticsearch/snapshot.status", 1), + ("Datastore/statement/Elasticsearch/_all/cluster.health", 1), + ("Datastore/statement/Elasticsearch/_all/search", 2), + ("Datastore/statement/Elasticsearch/address/index", 2), + ("Datastore/statement/Elasticsearch/address/search", 1), + ("Datastore/statement/Elasticsearch/contacts/index", 3), + ("Datastore/statement/Elasticsearch/contacts/indices.refresh", 1), + ("Datastore/statement/Elasticsearch/contacts/search", 2), + ("Datastore/statement/Elasticsearch/other/search", 2), +] + +# Version support + + +def is_importable(module_path): + try: + __import__(module_path) + return True + except ImportError: + return False + + +# Instance info + +_disable_scoped_metrics = list(_base_scoped_metrics) +_disable_rollup_metrics = list(_base_rollup_metrics) + +_enable_scoped_metrics = list(_base_scoped_metrics) +_enable_rollup_metrics = list(_base_rollup_metrics) + +_host = instance_hostname(ES_SETTINGS["host"]) +_port = ES_SETTINGS["port"] + +_instance_metric_name = f"Datastore/instance/Elasticsearch/{_host}/{_port}" + +_enable_rollup_metrics.append((_instance_metric_name, _all_count)) + +_disable_rollup_metrics.append((_instance_metric_name, None)) + +# Query + + +async def _exercise_es_v7(es): + await es.index( + index="contacts", doc_type="person", body={"name": "Joe Tester", "age": 25, "title": "QA Engineer"}, id=1 + ) + await es.index( + index="contacts", doc_type="person", body={"name": "Jessica Coder", "age": 32, "title": "Programmer"}, id=2 + ) + await es.index( + index="contacts", doc_type="person", body={"name": "Freddy Tester", "age": 29, "title": "Assistant"}, id=3 + ) + await es.indices.refresh("contacts") + await es.index( + index="address", doc_type="employee", body={"name": "Sherlock", "address": "221B Baker Street, London"}, id=1 + ) + await es.index( + index="address", + doc_type="employee", + body={"name": "Bilbo", "address": "Bag End, Bagshot row, Hobbiton, Shire"}, + id=2, + ) + await es.search(index="contacts", q="name:Joe") + await es.search(index="contacts", q="name:jessica") + await es.search(index="address", q="name:Sherlock") + await es.search(index=["contacts", "address"], q="name:Bilbo") + await es.search(index="contacts,address", q="name:Bilbo") + await es.search(index="*", q="name:Bilbo") + await es.search(q="name:Bilbo") + await es.cluster.health() + + if hasattr(es, "cat"): + await es.cat.health() + if hasattr(es, "nodes"): + await es.nodes.info() + if hasattr(es, "snapshot") and hasattr(es.snapshot, "status"): + await es.snapshot.status() + if hasattr(es.indices, "status"): + await es.indices.status() + + +async def _exercise_es_v8(es): + await es.index(index="contacts", body={"name": "Joe Tester", "age": 25, "title": "QA Engineer"}, id=1) + await es.index(index="contacts", body={"name": "Jessica Coder", "age": 32, "title": "Programmer"}, id=2) + await es.index(index="contacts", body={"name": "Freddy Tester", "age": 29, "title": "Assistant"}, id=3) + await es.indices.refresh(index="contacts") + await es.index(index="address", body={"name": "Sherlock", "address": "221B Baker Street, London"}, id=1) + await es.index(index="address", body={"name": "Bilbo", "address": "Bag End, Bagshot row, Hobbiton, Shire"}, id=2) + await es.search(index="contacts", q="name:Joe") + await es.search(index="contacts", q="name:jessica") + await es.search(index="address", q="name:Sherlock") + await es.search(index=["contacts", "address"], q="name:Bilbo") + await es.search(index="contacts,address", q="name:Bilbo") + await es.search(index="*", q="name:Bilbo") + await es.search(q="name:Bilbo") + await es.cluster.health() + + if hasattr(es, "cat"): + await es.cat.health() + if hasattr(es, "nodes"): + await es.nodes.info() + if hasattr(es, "snapshot") and hasattr(es.snapshot, "status"): + await es.snapshot.status() + if hasattr(es.indices, "status"): + await es.indices.status() + + +_exercise_es = _exercise_es_v8 if IS_V8_OR_ABOVE else _exercise_es_v7 + + +# Test + + +@validate_transaction_errors(errors=[]) +@validate_transaction_metrics( + "test_async_elasticsearch:test_async_elasticsearch_operation_disabled", + scoped_metrics=_disable_scoped_metrics, + rollup_metrics=_disable_rollup_metrics, + background_task=True, +) +@override_application_settings(_disable_instance_settings) +@background_task() +def test_async_elasticsearch_operation_disabled(async_client, loop): + loop.run_until_complete(_exercise_es(async_client)) + + +@validate_transaction_errors(errors=[]) +@validate_transaction_metrics( + "test_async_elasticsearch:test_async_elasticsearch_operation_enabled", + scoped_metrics=_enable_scoped_metrics, + rollup_metrics=_enable_rollup_metrics, + background_task=True, +) +@override_application_settings(_enable_instance_settings) +@background_task() +def test_async_elasticsearch_operation_enabled(async_client, loop): + loop.run_until_complete(_exercise_es(async_client)) + + +def test_async_elasticsearch_no_transaction(async_client, loop): + loop.run_until_complete(_exercise_es(async_client)) diff --git a/tests/datastore_elasticsearch/test_async_instrumented_methods.py b/tests/datastore_elasticsearch/test_async_instrumented_methods.py new file mode 100644 index 000000000..c6c771ba3 --- /dev/null +++ b/tests/datastore_elasticsearch/test_async_instrumented_methods.py @@ -0,0 +1,130 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import elasticsearch +import elasticsearch._async.client as async_client +import pytest +from conftest import ES_VERSION +from testing_support.validators.validate_datastore_trace_inputs import validate_datastore_trace_inputs + +from newrelic.api.background_task import background_task + +RUN_IF_V8 = pytest.mark.skipif( + ES_VERSION < (8,), reason="Only run for v8+. We don't support all methods in previous versions." +) + + +@pytest.mark.parametrize( + "sub_module,method,args,kwargs,expected_index", + [ + (None, "exists", (), {"index": "contacts", "id": 1}, "contacts"), + (None, "info", (), {}, None), + pytest.param( + None, + "msearch", + (), + {"searches": [{}, {"query": {"match": {"message": "this is a test"}}}], "index": "contacts"}, + "contacts", + marks=RUN_IF_V8, + ), + ("indices", "exists", (), {"index": "contacts"}, "contacts"), + ("indices", "exists_template", (), {"name": "no-exist"}, None), + ("cat", "count", (), {"index": "contacts"}, "contacts"), + ("cat", "health", (), {}, None), + pytest.param( + "cluster", + "allocation_explain", + (), + {"index": "contacts", "shard": 0, "primary": True}, + "contacts", + marks=RUN_IF_V8, + ), + ("cluster", "get_settings", (), {}, None), + ("cluster", "health", (), {"index": "contacts"}, "contacts"), + ("nodes", "info", (), {}, None), + ("snapshot", "status", (), {}, None), + ("tasks", "list", (), {}, None), + ("ingest", "geo_ip_stats", (), {}, None), + ], +) +def test_method_on_async_client_datastore_trace_inputs( + loop, async_client, sub_module, method, args, kwargs, expected_index +): + expected_operation = f"{sub_module}.{method}" if sub_module else method + + @validate_datastore_trace_inputs(target=expected_index, operation=expected_operation) + @background_task() + async def _test(): + if not sub_module: + await getattr(async_client, method)(*args, **kwargs) + else: + await getattr(getattr(async_client, sub_module), method)(*args, **kwargs) + + loop.run_until_complete(_test()) + + +def _test_methods_wrapped(_object, ignored_methods=None): + if not ignored_methods: + ignored_methods = {"perform_request", "transport"} + + def is_wrapped(m): + return hasattr(getattr(_object, m), "__wrapped__") + + methods = {m for m in dir(_object) if not m[0] == "_"} + uninstrumented = {m for m in (methods - ignored_methods) if not is_wrapped(m)} + assert not uninstrumented, f"There are uninstrumented methods: {uninstrumented}" + + +@RUN_IF_V8 +def test_async_instrumented_methods_client(): + _test_methods_wrapped(elasticsearch.AsyncElasticsearch) + + +@RUN_IF_V8 +def test_instrumented_methods_client_indices(): + _test_methods_wrapped(async_client.IndicesClient) + + +@RUN_IF_V8 +def test_instrumented_methods_client_cluster(): + _test_methods_wrapped(async_client.ClusterClient) + + +@RUN_IF_V8 +def test_instrumented_methods_client_cat(): + if hasattr(async_client, "CatClient"): + _test_methods_wrapped(async_client.CatClient) + + +@RUN_IF_V8 +def test_instrumented_methods_client_nodes(): + if hasattr(async_client, "NodesClient"): + _test_methods_wrapped(async_client.NodesClient) + + +@RUN_IF_V8 +def test_instrumented_methods_client_snapshot(): + if hasattr(async_client, "SnapshotClient"): + _test_methods_wrapped(async_client.SnapshotClient) + + +@RUN_IF_V8 +def test_instrumented_methods_client_tasks(): + if hasattr(async_client, "TasksClient"): + _test_methods_wrapped(async_client.TasksClient) + + +@RUN_IF_V8 +def test_instrumented_methods_client_ingest(): + if hasattr(async_client, "IngestClient"): + _test_methods_wrapped(async_client.IngestClient) diff --git a/tests/datastore_elasticsearch/test_async_mget.py b/tests/datastore_elasticsearch/test_async_mget.py new file mode 100644 index 000000000..4181ab647 --- /dev/null +++ b/tests/datastore_elasticsearch/test_async_mget.py @@ -0,0 +1,150 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from conftest import ES_MULTIPLE_SETTINGS, ES_VERSION +from elasticsearch import AsyncElasticsearch +from testing_support.fixture.event_loop import event_loop as loop # noqa: F401 +from testing_support.fixtures import override_application_settings +from testing_support.util import instance_hostname +from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics + +from newrelic.api.background_task import background_task + +try: + # v8+ + from elastic_transport import RoundRobinSelector +except ImportError: + # v7 + from elasticsearch.connection_pool import RoundRobinSelector + +# Settings + +_enable_instance_settings = {"datastore_tracer.instance_reporting.enabled": True} +_disable_instance_settings = {"datastore_tracer.instance_reporting.enabled": False} + +# Metrics + +_base_scoped_metrics = (("Datastore/statement/Elasticsearch/contacts/index", 2),) + +_base_rollup_metrics = ( + ("Datastore/all", 3), + ("Datastore/allOther", 3), + ("Datastore/Elasticsearch/all", 3), + ("Datastore/Elasticsearch/allOther", 3), + ("Datastore/operation/Elasticsearch/index", 2), + ("Datastore/operation/Elasticsearch/mget", 1), + ("Datastore/statement/Elasticsearch/contacts/index", 2), +) + +_disable_scoped_metrics = list(_base_scoped_metrics) +_disable_rollup_metrics = list(_base_rollup_metrics) + +_enable_scoped_metrics = list(_base_scoped_metrics) +_enable_rollup_metrics = list(_base_rollup_metrics) + +if len(ES_MULTIPLE_SETTINGS) > 1: + es_1 = ES_MULTIPLE_SETTINGS[0] + es_2 = ES_MULTIPLE_SETTINGS[1] + + host_1 = instance_hostname(es_1["host"]) + port_1 = es_1["port"] + + host_2 = instance_hostname(es_2["host"]) + port_2 = es_2["port"] + + instance_metric_name_1 = f"Datastore/instance/Elasticsearch/{host_1}/{port_1}" + instance_metric_name_2 = f"Datastore/instance/Elasticsearch/{host_2}/{port_2}" + + if ES_VERSION >= (8,): + _enable_rollup_metrics.extend([(instance_metric_name_1, 2), (instance_metric_name_2, 1)]) + else: + # Cannot deterministicly set the number of calls to each instance as it's random + # which node is selected first and called twice. Instead, check that both metrics are simply present. + _enable_rollup_metrics.extend([(instance_metric_name_1, "present"), (instance_metric_name_2, "present")]) + + _disable_rollup_metrics.extend([(instance_metric_name_1, None), (instance_metric_name_2, None)]) + + +@pytest.fixture(scope="module") +def client(loop): + urls = [f"http://{db['host']}:{db['port']}" for db in ES_MULTIPLE_SETTINGS] + # When selecting a connection from the pool, use the round robin method. + # This is actually the default already. Using round robin will ensure that + # doing two db calls will mean elastic search is talking to two different + # dbs. + if ES_VERSION >= (8,): + _client = AsyncElasticsearch(urls, node_selector_class=RoundRobinSelector, randomize_nodes_in_pool=False) + else: + _client = AsyncElasticsearch(urls, selector_class=RoundRobinSelector, randomize_nodes_in_pool=False) + + yield _client + loop.run_until_complete(_client.close()) + + +# Query + + +async def _exercise_es_multi(es): + # set on db 1 + if ES_VERSION >= (8,): + await es.index(index="contacts", body={"name": "Joe Tester", "age": 25, "title": "QA Engineer"}, id=1) + # set on db 2 + await es.index(index="contacts", body={"name": "Jane Tester", "age": 22, "title": "Senior QA Engineer"}, id=2) + else: + await es.index( + index="contacts", doc_type="person", body={"name": "Joe Tester", "age": 25, "title": "QA Engineer"}, id=1 + ) + # set on db 2 + await es.index( + index="contacts", + doc_type="person", + body={"name": "Jane Tester", "age": 22, "title": "Senior QA Engineer"}, + id=2, + ) + + # ask db 1, will return info from db 1 and 2 + mget_body = {"docs": [{"_id": 1, "_index": "contacts"}, {"_id": 2, "_index": "contacts"}]} + + results = await es.mget(body=mget_body) + assert len(results["docs"]) == 2 + + +# Test + + +@pytest.mark.skipif(len(ES_MULTIPLE_SETTINGS) < 2, reason="Test environment not configured with multiple databases.") +@override_application_settings(_enable_instance_settings) +@validate_transaction_metrics( + "test_async_mget:test_async_multi_get_enabled", + scoped_metrics=_enable_scoped_metrics, + rollup_metrics=_enable_rollup_metrics, + background_task=True, +) +@background_task() +def test_async_multi_get_enabled(client, loop): + loop.run_until_complete(_exercise_es_multi(client)) + + +@pytest.mark.skipif(len(ES_MULTIPLE_SETTINGS) < 2, reason="Test environment not configured with multiple databases.") +@override_application_settings(_disable_instance_settings) +@validate_transaction_metrics( + "test_async_mget:test_async_multi_get_disabled", + scoped_metrics=_disable_scoped_metrics, + rollup_metrics=_disable_rollup_metrics, + background_task=True, +) +@background_task() +def test_async_multi_get_disabled(client, loop): + loop.run_until_complete(_exercise_es_multi(client)) diff --git a/tests/datastore_elasticsearch/test_async_multiple_dbs.py b/tests/datastore_elasticsearch/test_async_multiple_dbs.py new file mode 100644 index 000000000..3e75a6285 --- /dev/null +++ b/tests/datastore_elasticsearch/test_async_multiple_dbs.py @@ -0,0 +1,123 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from conftest import ES_MULTIPLE_SETTINGS, ES_VERSION +from elasticsearch import AsyncElasticsearch +from testing_support.fixture.event_loop import event_loop as loop +from testing_support.fixtures import override_application_settings +from testing_support.util import instance_hostname +from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics + +from newrelic.api.background_task import background_task + +# Settings + +_enable_instance_settings = {"datastore_tracer.instance_reporting.enabled": True} +_disable_instance_settings = {"datastore_tracer.instance_reporting.enabled": False} + +# Metrics + +_base_scoped_metrics = (("Datastore/statement/Elasticsearch/contacts/index", 2),) + +_base_rollup_metrics = ( + ("Datastore/all", 2), + ("Datastore/allOther", 2), + ("Datastore/Elasticsearch/all", 2), + ("Datastore/Elasticsearch/allOther", 2), + ("Datastore/operation/Elasticsearch/index", 2), + ("Datastore/statement/Elasticsearch/contacts/index", 2), +) + +_disable_scoped_metrics = list(_base_scoped_metrics) +_disable_rollup_metrics = list(_base_rollup_metrics) + +_enable_scoped_metrics = list(_base_scoped_metrics) +_enable_rollup_metrics = list(_base_rollup_metrics) + +if len(ES_MULTIPLE_SETTINGS) > 1: + es_1 = ES_MULTIPLE_SETTINGS[0] + es_2 = ES_MULTIPLE_SETTINGS[1] + + host_1 = instance_hostname(es_1["host"]) + port_1 = es_1["port"] + + host_2 = instance_hostname(es_2["host"]) + port_2 = es_2["port"] + + instance_metric_name_1 = f"Datastore/instance/Elasticsearch/{host_1}/{port_1}" + instance_metric_name_2 = f"Datastore/instance/Elasticsearch/{host_2}/{port_2}" + + _enable_rollup_metrics.extend([(instance_metric_name_1, 1), (instance_metric_name_2, 1)]) + + _disable_rollup_metrics.extend([(instance_metric_name_1, None), (instance_metric_name_2, None)]) + +# Query + + +async def _exercise_es(es): + if ES_VERSION >= (8,): + await es.index(index="contacts", body={"name": "Joe Tester", "age": 25, "title": "QA Engineer"}, id=1) + else: + await es.index( + index="contacts", doc_type="person", body={"name": "Joe Tester", "age": 25, "title": "QA Engineer"}, id=1 + ) + + +# Test + + +@pytest.fixture(scope="session") +def async_clients(loop): + clients = [] + for db in ES_MULTIPLE_SETTINGS: + es_url = f"http://{db['host']}:{db['port']}" + clients.append(AsyncElasticsearch(es_url)) + + yield clients + + for client in clients: + loop.run_until_complete(client.close()) + + +@pytest.mark.skipif(len(ES_MULTIPLE_SETTINGS) < 2, reason="Test environment not configured with multiple databases.") +@override_application_settings(_enable_instance_settings) +@validate_transaction_metrics( + "test_async_multiple_dbs:test_async_multiple_dbs_enabled", + scoped_metrics=_enable_scoped_metrics, + rollup_metrics=_enable_rollup_metrics, + background_task=True, +) +@background_task() +def test_async_multiple_dbs_enabled(loop, async_clients): + import asyncio + + # Run multiple queries in parallel + loop.run_until_complete(asyncio.gather(*(_exercise_es(client) for client in async_clients))) + + +@pytest.mark.skipif(len(ES_MULTIPLE_SETTINGS) < 2, reason="Test environment not configured with multiple databases.") +@override_application_settings(_disable_instance_settings) +@validate_transaction_metrics( + "test_async_multiple_dbs:test_async_multiple_dbs_disabled", + scoped_metrics=_disable_scoped_metrics, + rollup_metrics=_disable_rollup_metrics, + background_task=True, +) +@background_task() +def test_async_multiple_dbs_disabled(loop, async_clients): + import asyncio + + # Run multiple queries in parallel + loop.run_until_complete(asyncio.gather(*(_exercise_es(client) for client in async_clients))) diff --git a/tests/datastore_elasticsearch/test_async_trace_node.py b/tests/datastore_elasticsearch/test_async_trace_node.py new file mode 100644 index 000000000..c503c7c06 --- /dev/null +++ b/tests/datastore_elasticsearch/test_async_trace_node.py @@ -0,0 +1,91 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from conftest import ES_SETTINGS, ES_VERSION +from testing_support.fixture.event_loop import event_loop as loop # noqa: F401 +from testing_support.fixtures import override_application_settings, validate_tt_parenting +from testing_support.util import instance_hostname +from testing_support.validators.validate_tt_collector_json import validate_tt_collector_json + +from newrelic.api.background_task import background_task + +# Settings + +_enable_instance_settings = { + "datastore_tracer.instance_reporting.enabled": True, + "datastore_tracer.database_name_reporting.enabled": True, +} +_disable_instance_settings = { + "datastore_tracer.instance_reporting.enabled": False, + "datastore_tracer.database_name_reporting.enabled": False, +} +_instance_only_settings = { + "datastore_tracer.instance_reporting.enabled": True, + "datastore_tracer.database_name_reporting.enabled": False, +} + +# Expected parameters + +_enabled_required = {"host": instance_hostname(ES_SETTINGS["host"]), "port_path_or_id": str(ES_SETTINGS["port"])} +_enabled_forgone = {"db.instance": "VALUE NOT USED"} + +_disabled_required = {} +_disabled_forgone = {"host": "VALUE NOT USED", "port_path_or_id": "VALUE NOT USED", "db.instance": "VALUE NOT USED"} + +_instance_only_required = {"host": instance_hostname(ES_SETTINGS["host"]), "port_path_or_id": str(ES_SETTINGS["port"])} +_instance_only_forgone = {"db.instance": "VALUE NOT USED"} + +_tt_parenting = ("TransactionNode", [("DatastoreNode", [])]) + + +# Query + + +async def _exercise_es_v7(es): + await es.index( + index="contacts", doc_type="person", body={"name": "Joe Tester", "age": 25, "title": "QA Master"}, id=1 + ) + + +async def _exercise_es_v8(es): + await es.index(index="contacts", body={"name": "Joe Tester", "age": 25, "title": "QA Master"}, id=1) + + +_exercise_es = _exercise_es_v7 if ES_VERSION < (8, 0, 0) else _exercise_es_v8 + +# Tests + + +@override_application_settings(_enable_instance_settings) +@validate_tt_collector_json(datastore_params=_enabled_required, datastore_forgone_params=_enabled_forgone) +@validate_tt_parenting(_tt_parenting) +@background_task() +def test_async_trace_node_datastore_params_enable_instance(async_client, loop): + loop.run_until_complete(_exercise_es(async_client)) + + +@override_application_settings(_disable_instance_settings) +@validate_tt_collector_json(datastore_params=_disabled_required, datastore_forgone_params=_disabled_forgone) +@validate_tt_parenting(_tt_parenting) +@background_task() +def test_async_trace_node_datastore_params_disable_instance(async_client, loop): + loop.run_until_complete(_exercise_es(async_client)) + + +@override_application_settings(_instance_only_settings) +@validate_tt_collector_json(datastore_params=_instance_only_required, datastore_forgone_params=_instance_only_forgone) +@validate_tt_parenting(_tt_parenting) +@background_task() +def test_async_trace_node_datastore_params_instance_only(async_client, loop): + loop.run_until_complete(_exercise_es(async_client)) diff --git a/tests/datastore_elasticsearch/test_async_transport.py b/tests/datastore_elasticsearch/test_async_transport.py new file mode 100644 index 000000000..21e8e0c7d --- /dev/null +++ b/tests/datastore_elasticsearch/test_async_transport.py @@ -0,0 +1,67 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, ES_VERSION 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from conftest import ES_SETTINGS, ES_URL, ES_VERSION, RUN_IF_V8_OR_ABOVE +from testing_support.util import instance_hostname +from testing_support.validators.validate_transaction_errors import validate_transaction_errors +from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics + +from newrelic.api.background_task import background_task +from newrelic.api.transaction import current_transaction + +try: + # v8+ + from elastic_transport._node._http_aiohttp import AiohttpHttpNode as AIOHttpConnection + from elastic_transport._node._http_httpx import HttpxAsyncHttpNode +except ImportError: + # v7 + from elasticsearch._async.http_aiohttp import AIOHttpConnection + + HttpxAsyncHttpNode = None # Not implemented in v7 + +HOST = instance_hostname(ES_SETTINGS["host"]) +PORT = ES_SETTINGS["port"] + + +async def _exercise_es(es): + if ES_VERSION >= (8,): + await es.index(index="contacts", body={"name": "Joe Tester", "age": 25, "title": "QA Engineer"}, id=1) + else: + await es.index( + index="contacts", doc_type="person", body={"name": "Joe Tester", "age": 25, "title": "QA Engineer"}, id=1 + ) + + +@pytest.mark.parametrize( + "client_kwargs", + [ + pytest.param({"node_class": HttpxAsyncHttpNode}, id="HttpxAsyncHttpNodeV8", marks=RUN_IF_V8_OR_ABOVE), + pytest.param({"node_class": AIOHttpConnection}, id="AIOHttpConnection"), + ], +) +@validate_transaction_errors(errors=[]) +@validate_transaction_metrics( + "test_async_transport:test_async_transport_connection_classes", + rollup_metrics=[(f"Datastore/instance/Elasticsearch/{HOST}/{PORT}", 1)], + scoped_metrics=[(f"Datastore/instance/Elasticsearch/{HOST}/{PORT}", None)], + background_task=True, +) +@background_task() +def test_async_transport_connection_classes(loop, client_kwargs): + from elasticsearch import AsyncElasticsearch + + async_client = AsyncElasticsearch(ES_URL, **client_kwargs) + loop.run_until_complete(_exercise_es(async_client)) + loop.run_until_complete(async_client.close()) diff --git a/tests/datastore_elasticsearch/test_elasticsearch.py b/tests/datastore_elasticsearch/test_elasticsearch.py index 6c79b2f76..685ea341a 100644 --- a/tests/datastore_elasticsearch/test_elasticsearch.py +++ b/tests/datastore_elasticsearch/test_elasticsearch.py @@ -13,7 +13,7 @@ # limitations under the License. import elasticsearch.client -from conftest import ES_SETTINGS, ES_VERSION +from conftest import ES_SETTINGS, IS_V8_OR_ABOVE from testing_support.fixtures import override_application_settings from testing_support.util import instance_hostname from testing_support.validators.validate_transaction_errors import validate_transaction_errors @@ -29,6 +29,9 @@ # Metrics _base_scoped_metrics = [ + ("Datastore/operation/Elasticsearch/cat.health", 1), + ("Datastore/operation/Elasticsearch/nodes.info", 1), + ("Datastore/operation/Elasticsearch/snapshot.status", 1), ("Datastore/statement/Elasticsearch/_all/cluster.health", 1), ("Datastore/statement/Elasticsearch/_all/search", 2), ("Datastore/statement/Elasticsearch/address/index", 2), @@ -39,11 +42,19 @@ ("Datastore/statement/Elasticsearch/other/search", 2), ] +_all_count = 17 _base_rollup_metrics = [ + ("Datastore/all", _all_count), + ("Datastore/allOther", _all_count), + ("Datastore/Elasticsearch/all", _all_count), + ("Datastore/Elasticsearch/allOther", _all_count), + ("Datastore/operation/Elasticsearch/cat.health", 1), ("Datastore/operation/Elasticsearch/cluster.health", 1), ("Datastore/operation/Elasticsearch/index", 5), ("Datastore/operation/Elasticsearch/indices.refresh", 1), + ("Datastore/operation/Elasticsearch/nodes.info", 1), ("Datastore/operation/Elasticsearch/search", 7), + ("Datastore/operation/Elasticsearch/snapshot.status", 1), ("Datastore/statement/Elasticsearch/_all/cluster.health", 1), ("Datastore/statement/Elasticsearch/_all/search", 2), ("Datastore/statement/Elasticsearch/address/index", 2), @@ -65,59 +76,6 @@ def is_importable(module_path): return False -_all_count = 14 - -if is_importable("elasticsearch.client.cat") or is_importable("elasticsearch._sync.client.cat"): - _base_scoped_metrics.append(("Datastore/operation/Elasticsearch/cat.health", 1)) - _base_rollup_metrics.append(("Datastore/operation/Elasticsearch/cat.health", 1)) - _all_count += 1 -else: - _base_scoped_metrics.append(("Datastore/operation/Elasticsearch/cat.health", None)) - _base_rollup_metrics.append(("Datastore/operation/Elasticsearch/cat.health", None)) - -if is_importable("elasticsearch.client.nodes") or is_importable("elasticsearch._sync.client.nodes"): - _base_scoped_metrics.append(("Datastore/operation/Elasticsearch/nodes.info", 1)) - _base_rollup_metrics.append(("Datastore/operation/Elasticsearch/nodes.info", 1)) - _all_count += 1 -else: - _base_scoped_metrics.append(("Datastore/operation/Elasticsearch/nodes.info", None)) - _base_rollup_metrics.append(("Datastore/operation/Elasticsearch/nodes.info", None)) - -if hasattr(elasticsearch.client, "SnapshotClient") and hasattr(elasticsearch.client.SnapshotClient, "status"): - _base_scoped_metrics.append(("Datastore/operation/Elasticsearch/snapshot.status", 1)) - _base_rollup_metrics.append(("Datastore/operation/Elasticsearch/snapshot.status", 1)) - _all_count += 1 -else: - _base_scoped_metrics.append(("Datastore/operation/Elasticsearch/snapshot.status", None)) - _base_rollup_metrics.append(("Datastore/operation/Elasticsearch/snapshot.status", None)) - -if hasattr(elasticsearch.client.IndicesClient, "status"): - _base_scoped_metrics.append(("Datastore/statement/Elasticsearch/_all/indices.status", 1)) - _base_rollup_metrics.extend( - [ - ("Datastore/operation/Elasticsearch/indices.status", 1), - ("Datastore/statement/Elasticsearch/_all/indices.status", 1), - ] - ) - _all_count += 1 -else: - _base_scoped_metrics.append(("Datastore/operation/Elasticsearch/indices.status", None)) - _base_rollup_metrics.extend( - [ - ("Datastore/operation/Elasticsearch/indices.status", None), - ("Datastore/statement/Elasticsearch/_all/indices.status", None), - ] - ) - -_base_rollup_metrics.extend( - [ - ("Datastore/all", _all_count), - ("Datastore/allOther", _all_count), - ("Datastore/Elasticsearch/all", _all_count), - ("Datastore/Elasticsearch/allOther", _all_count), - ] -) - # Instance info _disable_scoped_metrics = list(_base_scoped_metrics) @@ -199,7 +157,7 @@ def _exercise_es_v8(es): es.indices.status() -_exercise_es = _exercise_es_v7 if ES_VERSION < (8, 0, 0) else _exercise_es_v8 +_exercise_es = _exercise_es_v8 if IS_V8_OR_ABOVE else _exercise_es_v7 # Test @@ -229,3 +187,7 @@ def test_elasticsearch_operation_disabled(client): @background_task() def test_elasticsearch_operation_enabled(client): _exercise_es(client) + + +def test_elasticsearch_no_transaction(client): + _exercise_es(client) diff --git a/tests/datastore_elasticsearch/test_mget.py b/tests/datastore_elasticsearch/test_mget.py index e5f33ba4c..e6a2078f4 100644 --- a/tests/datastore_elasticsearch/test_mget.py +++ b/tests/datastore_elasticsearch/test_mget.py @@ -13,20 +13,21 @@ # limitations under the License. import pytest -from elasticsearch import Elasticsearch - -try: - from elastic_transport import RoundRobinSelector -except ImportError: - from elasticsearch.connection_pool import RoundRobinSelector - from conftest import ES_MULTIPLE_SETTINGS, ES_VERSION +from elasticsearch import Elasticsearch from testing_support.fixtures import override_application_settings from testing_support.util import instance_hostname from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics from newrelic.api.background_task import background_task +try: + # v8+ + from elastic_transport import RoundRobinSelector +except ImportError: + # v7 + from elasticsearch.connection_pool import RoundRobinSelector + # Settings _enable_instance_settings = {"datastore_tracer.instance_reporting.enabled": True} @@ -65,7 +66,12 @@ instance_metric_name_1 = f"Datastore/instance/Elasticsearch/{host_1}/{port_1}" instance_metric_name_2 = f"Datastore/instance/Elasticsearch/{host_2}/{port_2}" - _enable_rollup_metrics.extend([(instance_metric_name_1, 2), (instance_metric_name_2, 1)]) + if ES_VERSION >= (8,): + _enable_rollup_metrics.extend([(instance_metric_name_1, 2), (instance_metric_name_2, 1)]) + else: + # Cannot deterministicly set the number of calls to each instance as it's random + # which node is selected first and called twice. Instead, check that both metrics are simply present. + _enable_rollup_metrics.extend([(instance_metric_name_1, "present"), (instance_metric_name_2, "present")]) _disable_rollup_metrics.extend([(instance_metric_name_1, None), (instance_metric_name_2, None)]) @@ -78,10 +84,12 @@ def client(): # doing two db calls will mean elastic search is talking to two different # dbs. if ES_VERSION >= (8,): - client = Elasticsearch(urls, node_selector_class=RoundRobinSelector, randomize_hosts=False) + _client = Elasticsearch(urls, node_selector_class=RoundRobinSelector, randomize_nodes_in_pool=False) else: - client = Elasticsearch(urls, selector_class=RoundRobinSelector, randomize_hosts=False) - return client + _client = Elasticsearch(urls, selector_class=RoundRobinSelector, randomize_nodes_in_pool=False) + + yield _client + _client.close() # Query diff --git a/tests/datastore_elasticsearch/test_multiple_dbs.py b/tests/datastore_elasticsearch/test_multiple_dbs.py index a6875aa09..70da44d51 100644 --- a/tests/datastore_elasticsearch/test_multiple_dbs.py +++ b/tests/datastore_elasticsearch/test_multiple_dbs.py @@ -77,6 +77,19 @@ def _exercise_es(es): # Test +@pytest.fixture(scope="session") +def clients(loop): + clients = [] + for db in ES_MULTIPLE_SETTINGS: + es_url = f"http://{db['host']}:{db['port']}" + clients.append(Elasticsearch(es_url)) + + yield clients + + for client in clients: + client.close() + + @pytest.mark.skipif(len(ES_MULTIPLE_SETTINGS) < 2, reason="Test environment not configured with multiple databases.") @override_application_settings(_enable_instance_settings) @validate_transaction_metrics( @@ -86,10 +99,8 @@ def _exercise_es(es): background_task=True, ) @background_task() -def test_multiple_dbs_enabled(): - for db in ES_MULTIPLE_SETTINGS: - es_url = f"http://{db['host']}:{db['port']}" - client = Elasticsearch(es_url) +def test_multiple_dbs_enabled(clients): + for client in clients: _exercise_es(client) @@ -102,8 +113,6 @@ def test_multiple_dbs_enabled(): background_task=True, ) @background_task() -def test_multiple_dbs_disabled(): - for db in ES_MULTIPLE_SETTINGS: - es_url = f"http://{db['host']}:{db['port']}" - client = Elasticsearch(es_url) +def test_multiple_dbs_disabled(clients): + for client in clients: _exercise_es(client) diff --git a/tests/datastore_elasticsearch/test_transport.py b/tests/datastore_elasticsearch/test_transport.py index 287d25a7e..6efe26ad0 100644 --- a/tests/datastore_elasticsearch/test_transport.py +++ b/tests/datastore_elasticsearch/test_transport.py @@ -13,94 +13,64 @@ # limitations under the License. import pytest -from conftest import ES_SETTINGS, ES_VERSION -from elasticsearch.serializer import JSONSerializer +from conftest import ES_SETTINGS, ES_URL, ES_VERSION, RUN_IF_V7_OR_BELOW, RUN_IF_V8_OR_ABOVE +from testing_support.util import instance_hostname +from testing_support.validators.validate_transaction_errors import validate_transaction_errors +from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics from newrelic.api.background_task import background_task from newrelic.api.transaction import current_transaction try: - from elasticsearch.connection.http_requests import RequestsHttpConnection - from elasticsearch.connection.http_urllib3 import Urllib3HttpConnection - from elasticsearch.transport import Transport - - NodeConfig = dict -except ImportError: + # v8+ from elastic_transport._models import NodeConfig from elastic_transport._node._http_requests import RequestsHttpNode as RequestsHttpConnection from elastic_transport._node._http_urllib3 import Urllib3HttpNode as Urllib3HttpConnection - from elastic_transport._transport import Transport - - -IS_V8 = ES_VERSION >= (8,) -IS_V7 = ES_VERSION >= (7,) and ES_VERSION < (8, 0) -IS_BELOW_V7 = ES_VERSION < (7,) - -RUN_IF_V8 = pytest.mark.skipif(IS_V7 or IS_BELOW_V7, reason="Only run for v8+") -RUN_IF_V7 = pytest.mark.skipif(IS_V8 or IS_BELOW_V7, reason="Only run for v7") -RUN_IF_BELOW_V7 = pytest.mark.skipif(not IS_BELOW_V7, reason="Only run for versions below v7") +except ImportError: + # v7 + from elasticsearch.connection.http_requests import RequestsHttpConnection + from elasticsearch.connection.http_urllib3 import Urllib3HttpConnection + NodeConfig = dict -HOST = NodeConfig(scheme="http", host=ES_SETTINGS["host"], port=int(ES_SETTINGS["port"])) +HOST = instance_hostname(ES_SETTINGS["host"]) +PORT = ES_SETTINGS["port"] -METHOD = "/contacts/person/1" -HEADERS = {"Content-Type": "application/json"} -DATA = {"name": "Joe Tester"} -BODY = JSONSerializer().dumps(DATA) -if hasattr(BODY, "encode"): - BODY = BODY.encode("utf-8") +def _exercise_es(es): + if ES_VERSION >= (8,): + es.index(index="contacts", body={"name": "Joe Tester", "age": 25, "title": "QA Engineer"}, id=1) + else: + es.index( + index="contacts", doc_type="person", body={"name": "Joe Tester", "age": 25, "title": "QA Engineer"}, id=1 + ) @pytest.mark.parametrize( - "transport_kwargs, perform_request_kwargs", + "client_kwargs", [ - pytest.param({}, {"body": DATA}, id="DefaultTransport_below_v7", marks=RUN_IF_BELOW_V7), - pytest.param({}, {"headers": HEADERS, "body": DATA}, id="DefaultTransport_v7+", marks=RUN_IF_V7 or RUN_IF_V8), - pytest.param( - {"connection_class": Urllib3HttpConnection}, - {"body": DATA}, - id="Urllib3HttpConnectionv7", - marks=RUN_IF_BELOW_V7, - ), + pytest.param({}, id="DefaultTransport"), pytest.param( - {"connection_class": RequestsHttpConnection}, - {"body": DATA}, - id="RequestsHttpConnectionv7", - marks=RUN_IF_BELOW_V7, + {"connection_class": Urllib3HttpConnection}, id="Urllib3HttpConnectionv7", marks=RUN_IF_V7_OR_BELOW ), pytest.param( - {"connection_class": Urllib3HttpConnection}, - {"headers": HEADERS, "body": DATA}, - id="Urllib3HttpConnectionv7", - marks=RUN_IF_V7, - ), - pytest.param( - {"connection_class": RequestsHttpConnection}, - {"headers": HEADERS, "body": DATA}, - id="RequestsHttpConnectionv7", - marks=RUN_IF_V7, - ), - pytest.param( - {"node_class": Urllib3HttpConnection}, - {"headers": HEADERS, "body": DATA}, - id="Urllib3HttpNodev8", - marks=RUN_IF_V8, - ), - pytest.param( - {"node_class": RequestsHttpConnection}, - {"headers": HEADERS, "body": DATA}, - id="RequestsHttpNodev8", - marks=RUN_IF_V8, + {"connection_class": RequestsHttpConnection}, id="RequestsHttpConnectionv7", marks=RUN_IF_V7_OR_BELOW ), + pytest.param({"node_class": Urllib3HttpConnection}, id="Urllib3HttpNodev8", marks=RUN_IF_V8_OR_ABOVE), + pytest.param({"node_class": RequestsHttpConnection}, id="RequestsHttpNodev8", marks=RUN_IF_V8_OR_ABOVE), ], ) +@validate_transaction_errors(errors=[]) +@validate_transaction_metrics( + "test_transport:test_transport_connection_classes", + rollup_metrics=[(f"Datastore/instance/Elasticsearch/{HOST}/{PORT}", 1)], + scoped_metrics=[(f"Datastore/instance/Elasticsearch/{HOST}/{PORT}", None)], + background_task=True, +) @background_task() -def test_transport_connection_classes(transport_kwargs, perform_request_kwargs): - transaction = current_transaction() - - transport = Transport([HOST], **transport_kwargs) - transport.perform_request("POST", METHOD, **perform_request_kwargs) +def test_transport_connection_classes(client_kwargs): + from elasticsearch import Elasticsearch - expected = (ES_SETTINGS["host"], ES_SETTINGS["port"], None) - assert transaction._nr_datastore_instance_info == expected + client = Elasticsearch(ES_URL, **client_kwargs) + with client: + _exercise_es(client) diff --git a/tox.ini b/tox.ini index 502f3bfb9..a55832a4d 100644 --- a/tox.ini +++ b/tox.ini @@ -262,8 +262,9 @@ deps = datastore_cassandradriver-cassandralatest: cassandra-driver datastore_cassandradriver-cassandralatest: twisted datastore_elasticsearch: requests - datastore_elasticsearch-elasticsearch07: elasticsearch<8.0 - datastore_elasticsearch-elasticsearch08: elasticsearch<9.0 + datastore_elasticsearch: httpx + datastore_elasticsearch-elasticsearch07: elasticsearch[async]<8.0 + datastore_elasticsearch-elasticsearch08: elasticsearch[async]<9.0 datastore_firestore: google-cloud-firestore datastore_memcache-memcached01: python-memcached<2 datastore_mysql-mysqllatest: mysql-connector-python