Skip to content

Added optimization for OpenSearch engine which were missing #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: opensearch.improvements
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine/clients/opensearch/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def get_opensearch_client(host, connection_params):
return client


def _wait_for_es_status(client, status="yellow"):
def _wait_for_os_status(client, status="yellow"):
print(f"waiting for OpenSearch cluster health {status} status...")
for _ in range(100):
try:
Expand Down
51 changes: 41 additions & 10 deletions engine/clients/opensearch/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,49 @@ def recreate(self, dataset: Dataset, collection_params):
if dataset.config.vector_size > 16000:
raise IncompatibilityError

index_settings = (
{
"knn": True,
"number_of_replicas": 0,
"refresh_interval": -1, # no refresh is required because we index all the data at once
},
)
nodes_stats_res = self.client.nodes.info(filter_path="nodes.*.roles,nodes.*.os")
nodes_data = nodes_stats_res.get("nodes")

data_node_count = 0
total_processors = 0
for node_id in nodes_data:
node_info = nodes_data.get(node_id)
roles = node_info["roles"]
os_info = node_info["os"]
if 'data' in roles:
data_node_count += 1
total_processors += int(os_info['allocated_processors'])

processors_per_node = total_processors // data_node_count

index_thread_qty = max(1, processors_per_node // 2)

cluster_settings_body = {
"persistent": {
"knn.memory.circuit_breaker.limit": "75%",
"knn.algo_param.index_thread_qty": index_thread_qty
}
}

self.client.cluster.put_settings(cluster_settings_body)

index_settings = {
"knn": True,
"number_of_replicas": 0,
"refresh_interval": -1, # no refresh is required because we index all the data at once
}
index_config = collection_params.get("index")

# if we specify the number_of_shards on the config, enforce it. otherwise use the default
if "number_of_shards" in index_config:
if index_config is not None and index_config.has_key("number_of_shards"):
index_settings["number_of_shards"] = 1

field_config = self._prepare_fields_config(dataset)

engine = "faiss"
if field_config == {} or field_config is None:
engine = "nmslib"

# Followed the bellow link for tuning for ingestion and querying
# https://opensearch.org/docs/1.1/search-plugins/knn/performance-tuning/#indexing-performance-tuning
self.client.indices.create(
Expand All @@ -73,7 +103,7 @@ def recreate(self, dataset: Dataset, collection_params):
"method": {
**{
"name": "hnsw",
"engine": "lucene",
"engine": engine,
"space_type": self.DISTANCE_MAPPING[
dataset.config.distance
],
Expand All @@ -85,7 +115,8 @@ def recreate(self, dataset: Dataset, collection_params):
**collection_params.get("method"),
},
},
**self._prepare_fields_config(dataset),
# this doesn't work for nmslib, we need see what to do here, may be remove them
**field_config
}
},
},
Expand Down
10 changes: 8 additions & 2 deletions engine/clients/opensearch/search.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import multiprocessing as mp
import uuid
from typing import List, Tuple

import backoff

from opensearchpy import OpenSearch
from opensearchpy.exceptions import TransportError

Expand Down Expand Up @@ -75,9 +75,12 @@ def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]:
params={
"timeout": OPENSEARCH_TIMEOUT,
},
_source=False,
docvalue_fields=["_id"],
stored_fields="_none_",
)
return [
(uuid.UUID(hex=hit["_id"]).int, hit["_score"])
(uuid.UUID(hex=hit["fields"]["_id"][0]).int, hit["_score"])
for hit in res["hits"]["hits"]
]

Expand All @@ -87,3 +90,6 @@ def setup_search(cls):
cls.client.indices.put_settings(
body=cls.search_params["config"], index=OPENSEARCH_INDEX
)
# Load the graphs in memory
warmup_endpoint = f'/_plugins/_knn/warmup/{OPENSEARCH_INDEX}'
print(cls.client.transport.perform_request('GET', warmup_endpoint))
14 changes: 12 additions & 2 deletions engine/clients/opensearch/upload.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import multiprocessing as mp
import uuid
import time
from typing import List

import backoff
Expand All @@ -12,7 +13,7 @@
OPENSEARCH_BULK_INDEX_TIMEOUT,
OPENSEARCH_FULL_INDEX_TIMEOUT,
OPENSEARCH_INDEX,
_wait_for_es_status,
_wait_for_os_status,
get_opensearch_client,
)

Expand Down Expand Up @@ -75,6 +76,15 @@ def upload_batch(cls, batch: List[Record]):
on_backoff=_index_backoff_handler,
)
def post_upload(cls, _distance):
force_merge_endpoint = f'/{OPENSEARCH_INDEX}/_forcemerge?max_num_segments=1&wait_for_completion=false'
force_merge_task_id = cls.client.transport.perform_request('POST', force_merge_endpoint)['task']
SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC = 30
while True:
time.sleep(SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC)
task_status = cls.client.tasks.get(task_id=force_merge_task_id)
if task_status['completed']:
break

print(
"Updated the index settings back to the default and waiting for indexing to be completed."
)
Expand All @@ -84,7 +94,7 @@ def post_upload(cls, _distance):
index=OPENSEARCH_INDEX,
body={"index": {"refresh_interval": refresh_interval}},
)
_wait_for_es_status(cls.client)
_wait_for_os_status(cls.client)
return {}

def get_memory_usage(cls):
Expand Down
15 changes: 12 additions & 3 deletions engine/servers/opensearch-single-node-ci/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,20 @@ version: '3.5'

services:
opensearch:
image: opensearchproject/opensearch:2.10.0
image: opensearchproject/opensearch:2.14.0
environment:
discovery.type: "single-node"
plugins.security.disabled: true
OPENSEARCH_JAVA_OPTS: "-Xms2g -Xmx2g"
DISABLE_SECURITY_PLUGIN: true
DISABLE_INSTALL_DEMO_CONFIG: true
bootstrap.memory_lock: true
ulimits:
memlock:
soft: -1 # Set memlock to unlimited (no soft or hard limit)
hard: -1
nofile:
soft: 65536 # Maximum number of open files for the opensearch user - set to at least 65536
hard: 65536
ports:
- "9200:9200"
- "9300:9300"
Expand All @@ -18,4 +27,4 @@ services:
deploy:
resources:
limits:
memory: 4Gb
memory: 5Gb
14 changes: 12 additions & 2 deletions engine/servers/opensearch-single-node/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,21 @@ version: '3.5'

services:
opensearch:
image: opensearchproject/opensearch:2.10.0
image: opensearchproject/opensearch:2.14.0
environment:
discovery.type: "single-node"
plugins.security.disabled: true
OPENSEARCH_JAVA_OPTS: "-Xms4g -Xmx4g"
DISABLE_SECURITY_PLUGIN: true
DISABLE_INSTALL_DEMO_CONFIG: true
bootstrap.memory_lock: true
ulimits:
memlock:
soft: -1 # Set memlock to unlimited (no soft or hard limit)
hard: -1
nofile:
soft: 65536 # Maximum number of open files for the opensearch user - set to at least 65536
hard: 65536

ports:
- "9200:9200"
- "9300:9300"
Expand Down
Loading