1
1
from opensearchpy import NotFoundError , OpenSearch
2
2
3
3
from benchmark .dataset import Dataset
4
- from engine .base_client import IncompatibilityError
5
4
from engine .base_client .configure import BaseConfigurator
6
5
from engine .base_client .distances import Distance
7
6
from engine .clients .opensearch .config import (
10
9
OPENSEARCH_PORT ,
11
10
OPENSEARCH_USER ,
12
11
)
12
+ from engine .clients .opensearch .utils import get_index_thread_qty
13
13
14
14
15
15
class OpenSearchConfigurator (BaseConfigurator ):
@@ -40,28 +40,37 @@ def __init__(self, host, collection_params: dict, connection_params: dict):
40
40
)
41
41
42
42
def clean (self ):
43
- try :
43
+ is_index_available = self .client .indices .exists (index = OPENSEARCH_INDEX ,
44
+ params = {
45
+ "timeout" : 300 ,
46
+ })
47
+ if (is_index_available ):
48
+ print (f"Deleting index: { OPENSEARCH_INDEX } , as it is already present" )
44
49
self .client .indices .delete (
45
50
index = OPENSEARCH_INDEX ,
46
51
params = {
47
52
"timeout" : 300 ,
48
53
},
49
54
)
50
- except NotFoundError :
51
- pass
55
+
52
56
53
57
def recreate (self , dataset : Dataset , collection_params ):
54
- if dataset .config .distance == Distance .DOT :
55
- raise IncompatibilityError
56
- if dataset .config .vector_size > 1024 :
57
- raise IncompatibilityError
58
+ self ._update_cluster_settings ()
59
+ distance = self .DISTANCE_MAPPING [dataset .config .distance ]
60
+ if dataset .config .distance == Distance .COSINE :
61
+ distance = self .DISTANCE_MAPPING [Distance .DOT ]
62
+ print (f"Using distance type: { distance } as dataset distance is : { dataset .config .distance } " )
58
63
59
64
self .client .indices .create (
60
65
index = OPENSEARCH_INDEX ,
61
66
body = {
62
67
"settings" : {
63
68
"index" : {
64
69
"knn" : True ,
70
+ "refresh_interval" : - 1 ,
71
+ "number_of_replicas" : 0 if collection_params .get ("number_of_replicas" ) == None else collection_params .get ("number_of_replicas" ),
72
+ "number_of_shards" : 1 if collection_params .get ("number_of_shards" ) == None else collection_params .get ("number_of_shards" ),
73
+ "knn.advanced.approximate_threshold" : "-1"
65
74
}
66
75
},
67
76
"mappings" : {
@@ -72,18 +81,13 @@ def recreate(self, dataset: Dataset, collection_params):
72
81
"method" : {
73
82
** {
74
83
"name" : "hnsw" ,
75
- "engine" : "lucene" ,
76
- "space_type" : self .DISTANCE_MAPPING [
77
- dataset .config .distance
78
- ],
79
- "parameters" : {
80
- "m" : 16 ,
81
- "ef_construction" : 100 ,
82
- },
84
+ "engine" : "faiss" ,
85
+ "space_type" : distance ,
86
+ ** collection_params .get ("method" )
83
87
},
84
- ** collection_params .get ("method" ),
85
88
},
86
89
},
90
+ # this doesn't work for nmslib, we need see what to do here, may be remove them
87
91
** self ._prepare_fields_config (dataset ),
88
92
}
89
93
},
@@ -94,6 +98,16 @@ def recreate(self, dataset: Dataset, collection_params):
94
98
cluster_manager_timeout = "5m" ,
95
99
)
96
100
101
+ def _update_cluster_settings (self ):
102
+ index_thread_qty = get_index_thread_qty (self .client )
103
+ cluster_settings_body = {
104
+ "persistent" : {
105
+ "knn.memory.circuit_breaker.limit" : "75%" , # putting a higher value to ensure that even with small cluster the latencies for vector search are good
106
+ "knn.algo_param.index_thread_qty" : index_thread_qty
107
+ }
108
+ }
109
+ self .client .cluster .put_settings (cluster_settings_body )
110
+
97
111
def _prepare_fields_config (self , dataset : Dataset ):
98
112
return {
99
113
field_name : {
@@ -104,3 +118,9 @@ def _prepare_fields_config(self, dataset: Dataset):
104
118
}
105
119
for field_name , field_type in dataset .config .schema .items ()
106
120
}
121
+
122
+ def execution_params (self , distance , vector_size ) -> dict :
123
+ # normalize the vectors if cosine similarity is there.
124
+ if distance == Distance .COSINE :
125
+ return {"normalize" : "true" }
126
+ return {}
0 commit comments