@@ -31,6 +31,7 @@ class OceanBaseVectorConfig(BaseModel):
31
31
user : str
32
32
password : str
33
33
database : str
34
+ enable_hybrid_search : bool = False
34
35
35
36
@model_validator (mode = "before" )
36
37
@classmethod
@@ -57,6 +58,7 @@ def __init__(self, collection_name: str, config: OceanBaseVectorConfig):
57
58
password = self ._config .password ,
58
59
db_name = self ._config .database ,
59
60
)
61
+ self ._hybrid_search_enabled = self ._check_hybrid_search_support () # Check if hybrid search is supported
60
62
61
63
def get_type (self ) -> str :
62
64
return VectorType .OCEANBASE
@@ -98,6 +100,16 @@ def _create_collection(self) -> None:
98
100
columns = cols ,
99
101
vidxs = vidx_params ,
100
102
)
103
+ try :
104
+ if self ._hybrid_search_enabled :
105
+ self ._client .perform_raw_text_sql (f"""ALTER TABLE { self ._collection_name }
106
+ ADD FULLTEXT INDEX fulltext_index_for_col_text (text) WITH PARSER ik""" )
107
+ except Exception as e :
108
+ raise Exception (
109
+ "Failed to add fulltext index to the target table, your OceanBase version must be 4.3.5.1 or above "
110
+ + "to support fulltext index and vector index in the same table" ,
111
+ e ,
112
+ )
101
113
vals = []
102
114
params = self ._client .perform_raw_text_sql ("SHOW PARAMETERS LIKE '%ob_vector_memory_limit_percentage%'" )
103
115
for row in params :
@@ -116,6 +128,27 @@ def _create_collection(self) -> None:
116
128
)
117
129
redis_client .set (collection_exist_cache_key , 1 , ex = 3600 )
118
130
131
+ def _check_hybrid_search_support (self ) -> bool :
132
+ """
133
+ Check if the current OceanBase version supports hybrid search.
134
+ Returns True if the version is >= 4.3.5.1, otherwise False.
135
+ """
136
+ if not self ._config .enable_hybrid_search :
137
+ return False
138
+
139
+ try :
140
+ from packaging import version
141
+
142
+ # return OceanBase_CE 4.3.5.1 (r101000042025031818-bxxxx) (Built Mar 18 2025 18:13:36)
143
+ result = self ._client .perform_raw_text_sql ("SELECT @@version_comment AS version" )
144
+ ob_full_version = result .fetchone ()[0 ]
145
+ ob_version = ob_full_version .split ()[1 ]
146
+ logger .debug ("Current OceanBase version is %s" , ob_version )
147
+ return version .parse (ob_version ).base_version >= version .parse ("4.3.5.1" ).base_version
148
+ except Exception as e :
149
+ logger .warning (f"Failed to check OceanBase version: { str (e )} . Disabling hybrid search." )
150
+ return False
151
+
119
152
def add_texts (self , documents : list [Document ], embeddings : list [list [float ]], ** kwargs ):
120
153
ids = self ._get_uuids (documents )
121
154
for id , doc , emb in zip (ids , documents , embeddings ):
@@ -130,7 +163,7 @@ def add_texts(self, documents: list[Document], embeddings: list[list[float]], **
130
163
)
131
164
132
165
def text_exists (self , id : str ) -> bool :
133
- cur = self ._client .get (table_name = self ._collection_name , id = id )
166
+ cur = self ._client .get (table_name = self ._collection_name , ids = id )
134
167
return bool (cur .rowcount != 0 )
135
168
136
169
def delete_by_ids (self , ids : list [str ]) -> None :
@@ -139,9 +172,12 @@ def delete_by_ids(self, ids: list[str]) -> None:
139
172
self ._client .delete (table_name = self ._collection_name , ids = ids )
140
173
141
174
def get_ids_by_metadata_field (self , key : str , value : str ) -> list [str ]:
175
+ from sqlalchemy import text
176
+
142
177
cur = self ._client .get (
143
178
table_name = self ._collection_name ,
144
- where_clause = f"metadata->>'$.{ key } ' = '{ value } '" ,
179
+ ids = None ,
180
+ where_clause = [text (f"metadata->>'$.{ key } ' = '{ value } '" )],
145
181
output_column_name = ["id" ],
146
182
)
147
183
return [row [0 ] for row in cur ]
@@ -151,36 +187,84 @@ def delete_by_metadata_field(self, key: str, value: str) -> None:
151
187
self .delete_by_ids (ids )
152
188
153
189
def search_by_full_text (self , query : str , ** kwargs : Any ) -> list [Document ]:
154
- return []
190
+ if not self ._hybrid_search_enabled :
191
+ return []
192
+
193
+ try :
194
+ top_k = kwargs .get ("top_k" , 5 )
195
+ if not isinstance (top_k , int ) or top_k <= 0 :
196
+ raise ValueError ("top_k must be a positive integer" )
197
+
198
+ document_ids_filter = kwargs .get ("document_ids_filter" )
199
+ where_clause = ""
200
+ if document_ids_filter :
201
+ document_ids = ", " .join (f"'{ id } '" for id in document_ids_filter )
202
+ where_clause = f" AND metadata->>'$.document_id' IN ({ document_ids } )"
203
+
204
+ full_sql = f"""SELECT metadata, text, MATCH (text) AGAINST (:query) AS score
205
+ FROM { self ._collection_name }
206
+ WHERE MATCH (text) AGAINST (:query) > 0
207
+ { where_clause }
208
+ ORDER BY score DESC
209
+ LIMIT { top_k } """
210
+
211
+ with self ._client .engine .connect () as conn :
212
+ with conn .begin ():
213
+ from sqlalchemy import text
214
+
215
+ result = conn .execute (text (full_sql ), {"query" : query })
216
+ rows = result .fetchall ()
217
+
218
+ docs = []
219
+ for row in rows :
220
+ metadata_str , _text , score = row
221
+ try :
222
+ metadata = json .loads (metadata_str )
223
+ except json .JSONDecodeError :
224
+ print (f"Invalid JSON metadata: { metadata_str } " )
225
+ metadata = {}
226
+ metadata ["score" ] = score
227
+ docs .append (Document (page_content = _text , metadata = metadata ))
228
+
229
+ return docs
230
+ except Exception as e :
231
+ logger .warning (f"Failed to fulltext search: { str (e )} ." )
232
+ return []
155
233
156
234
def search_by_vector (self , query_vector : list [float ], ** kwargs : Any ) -> list [Document ]:
157
235
document_ids_filter = kwargs .get ("document_ids_filter" )
158
- where_clause = None
236
+ _where_clause = None
159
237
if document_ids_filter :
160
238
document_ids = ", " .join (f"'{ id } '" for id in document_ids_filter )
161
239
where_clause = f"metadata->>'$.document_id' in ({ document_ids } )"
240
+ from sqlalchemy import text
241
+
242
+ _where_clause = [text (where_clause )]
162
243
ef_search = kwargs .get ("ef_search" , self ._hnsw_ef_search )
163
244
if ef_search != self ._hnsw_ef_search :
164
245
self ._client .set_ob_hnsw_ef_search (ef_search )
165
246
self ._hnsw_ef_search = ef_search
166
247
topk = kwargs .get ("top_k" , 10 )
167
- cur = self ._client .ann_search (
168
- table_name = self ._collection_name ,
169
- vec_column_name = "vector" ,
170
- vec_data = query_vector ,
171
- topk = topk ,
172
- distance_func = func .l2_distance ,
173
- output_column_names = ["text" , "metadata" ],
174
- with_dist = True ,
175
- where_clause = where_clause ,
176
- )
248
+ try :
249
+ cur = self ._client .ann_search (
250
+ table_name = self ._collection_name ,
251
+ vec_column_name = "vector" ,
252
+ vec_data = query_vector ,
253
+ topk = topk ,
254
+ distance_func = func .l2_distance ,
255
+ output_column_names = ["text" , "metadata" ],
256
+ with_dist = True ,
257
+ where_clause = _where_clause ,
258
+ )
259
+ except Exception as e :
260
+ raise Exception ("Failed to search by vector. " , e )
177
261
docs = []
178
- for text , metadata , distance in cur :
262
+ for _text , metadata , distance in cur :
179
263
metadata = json .loads (metadata )
180
264
metadata ["score" ] = 1 - distance / math .sqrt (2 )
181
265
docs .append (
182
266
Document (
183
- page_content = text ,
267
+ page_content = _text ,
184
268
metadata = metadata ,
185
269
)
186
270
)
0 commit comments