Skip to content

add lancedb to benchmark #206

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions engine/clients/client_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
ElasticSearcher,
ElasticUploader,
)
from engine.clients.lancedb import LancedbConfigurator, LancedbSearcher, LancedbUploader
from engine.clients.milvus import MilvusConfigurator, MilvusSearcher, MilvusUploader
from engine.clients.opensearch import (
OpenSearchConfigurator,
Expand Down Expand Up @@ -39,6 +40,7 @@
"opensearch": OpenSearchConfigurator,
"redis": RedisConfigurator,
"pgvector": PgVectorConfigurator,
"lancedb": LancedbConfigurator,
}

ENGINE_UPLOADERS = {
Expand All @@ -49,6 +51,7 @@
"opensearch": OpenSearchUploader,
"redis": RedisUploader,
"pgvector": PgVectorUploader,
"lancedb": LancedbUploader,
}

ENGINE_SEARCHERS = {
Expand All @@ -59,6 +62,7 @@
"opensearch": OpenSearchSearcher,
"redis": RedisSearcher,
"pgvector": PgVectorSearcher,
"lancedb": LancedbSearcher,
}


Expand Down
9 changes: 9 additions & 0 deletions engine/clients/lancedb/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from engine.clients.lancedb.configure import LancedbConfigurator
from engine.clients.lancedb.search import LancedbSearcher
from engine.clients.lancedb.upload import LancedbUploader

__all__ = [
"LancedbConfigurator",
"LancedbSearcher",
"LancedbUploader",
]
3 changes: 3 additions & 0 deletions engine/clients/lancedb/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import os

LANCEDB_COLLECTION_NAME = os.getenv("LANCEDB_COLLECTION_NAME", "benchmark")
55 changes: 55 additions & 0 deletions engine/clients/lancedb/configure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import lancedb
import pyarrow as pa

from benchmark.dataset import Dataset
from engine.base_client.configure import BaseConfigurator
from engine.base_client.distances import Distance
from engine.clients.lancedb.config import LANCEDB_COLLECTION_NAME


class LancedbConfigurator(BaseConfigurator):

DISTANCE_MAPPING = {
Distance.L2: "l2",
Distance.COSINE: "cosine",
Distance.DOT: "dot",
}

DTYPE_MAPPING = {
"int": pa.int64(),
"keyword": pa.string(),
"text": pa.string(),
"float": pa.float32(),
}

def __init__(self, host, collection_params: dict, connection_params: dict):
super().__init__(host, collection_params, connection_params)
uri = "~/.lancedb"
self.client = lancedb.connect(uri, **connection_params)

def clean(self):
"""
Delete a collection and all associated embeddings, documents, and metadata.

This is destructive and not reversible.
"""
try:
self.client.drop_table(LANCEDB_COLLECTION_NAME)
except (Exception, ValueError):
pass

def recreate(self, dataset: Dataset, collection_params):
schema = pa.schema(
[
pa.field(
"vector",
pa.list_(pa.float32(), list_size=dataset.config.vector_size),
),
pa.field("id", pa.int64()),
]
+ [
pa.field(field_name, self.DTYPE_MAPPING.get(field_type))
for field_name, field_type in dataset.config.schema.items()
]
)
self.client.create_table(name=LANCEDB_COLLECTION_NAME, schema=schema)
44 changes: 44 additions & 0 deletions engine/clients/lancedb/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import List, Optional

from engine.base_client import IncompatibilityError
from engine.base_client.parser import BaseConditionParser, FieldValue


class LancedbConditionParser(BaseConditionParser):
def build_condition(
self, and_subfilters: Optional[List[str]], or_subfilters: Optional[List[str]]
) -> str:
condition: str = ""
if and_subfilters is not None:
condition += " AND ".join(and_subfilters)

if or_subfilters is not None:
condition += " OR ".join(or_subfilters)
return condition

def build_exact_match_filter(self, field_name: str, value: FieldValue) -> str:
return f"({field_name} = {value})"

def build_range_filter(
self,
field_name: str,
lt: Optional[FieldValue],
gt: Optional[FieldValue],
lte: Optional[FieldValue],
gte: Optional[FieldValue],
) -> str:
out = []
if lt:
out.append(f"({field_name} < {lt})")
if gt:
out.append(f"({field_name} > {gt})")
if lte:
out.append(f"({field_name} <= {lte})")
if gte:
out.append(f"({field_name} >= {gte})")
return "(" + " AND ".join(out) + ")"

def build_geo_filter(
self, field_name: str, lat: float, lon: float, radius: float
) -> str:
raise IncompatibilityError
36 changes: 36 additions & 0 deletions engine/clients/lancedb/search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from typing import List, Tuple

import lancedb
from lancedb import DBConnection
from lancedb.query import LanceVectorQueryBuilder

from dataset_reader.base_reader import Query
from engine.base_client.search import BaseSearcher
from engine.clients.lancedb import LancedbConfigurator
from engine.clients.lancedb.config import LANCEDB_COLLECTION_NAME
from engine.clients.lancedb.parser import LancedbConditionParser


class LancedbSearcher(BaseSearcher):
client: DBConnection = None
parser = LancedbConditionParser()

@classmethod
def init_client(cls, host, distance, connection_params: dict, search_params: dict):
uri = "~/.lancedb"
cls.client = lancedb.connect(uri, **connection_params)
cls.distance = distance

@classmethod
def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]:
tbl = cls.client.open_table(name=LANCEDB_COLLECTION_NAME)
df: LanceVectorQueryBuilder = tbl.search(query.vector)
results = (
df.metric(LancedbConfigurator.DISTANCE_MAPPING.get(cls.distance))
.where(cls.parser.parse(query.meta_conditions), prefilter=True)
.select(["id"])
.limit(top)
.to_list()
)

return [(result["id"], result["_distance"]) for result in results]
79 changes: 79 additions & 0 deletions engine/clients/lancedb/upload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from typing import List

import lancedb
import pyarrow as pa
from lancedb import DBConnection
from lancedb.table import Table

from dataset_reader.base_reader import Record
from engine.base_client.upload import BaseUploader
from engine.clients.lancedb.config import LANCEDB_COLLECTION_NAME


class LancedbUploader(BaseUploader):
client: DBConnection = None
upload_params = {}

@classmethod
def init_client(cls, host, distance, connection_params, upload_params):
uri = "~/.lancedb"
cls.client = lancedb.connect(uri, **connection_params)
cls.upload_params = upload_params

@classmethod
def upload_batch(cls, batch: List[Record]):
data = [
{"vector": point.vector, "id": point.id, **(point.metadata or {})}
for point in batch
]
tbl = cls.client.open_table(name=LANCEDB_COLLECTION_NAME)

i = 15
while True:
try:
tbl.add(data)
except OSError as e:
# https://lancedb.github.io/lance/format.html#conflict-resolution
i -= 1
if i == 0:
raise OSError(
"After 15 attempts, the conflict could not be resolved."
) from e
continue
return

@classmethod
def get_correct_num_sub_vectors(cls, num_sub_vectors: int, tbl: Table):
# OSError: Invalid user input: num_sub_vectors must divide vector dimension 100, but got 64
# Workaround to get dataset.config.vector_size
field = tbl.schema.field("vector")
if not pa.types.is_fixed_size_list(field.type):
return num_sub_vectors
list_type: pa.FixedSizeListType = field.type
vector_dimension = list_type.list_size

def closest_divisor(a, b):
# Find all divisors of a
divisors = [i for i in range(1, a + 1) if a % i == 0]
# Find the divisor closest to b
closest = min(divisors, key=lambda x: abs(x - b))
return closest

return closest_divisor(vector_dimension, num_sub_vectors)

@classmethod
def post_upload(cls, _distance):
# Create and train the index - you need to have enough data in the table for an effective training step
tbl = cls.client.open_table(name=LANCEDB_COLLECTION_NAME)

indices = cls.upload_params.get("indices", [])
for index in indices:
num_sub_vectors = cls.get_correct_num_sub_vectors(
index["num_sub_vectors"], tbl
)
tbl.create_index(
num_partitions=index["num_partitions"],
num_sub_vectors=num_sub_vectors,
metric=_distance,
)
return {}
76 changes: 76 additions & 0 deletions experiments/configurations/lancedb-single-node.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
[
{
"name": "lancedb-default",
"engine": "lancedb",
"connection_params": {},
"collection_params": {
},
"search_params": [
{
"parallel": 8,
"config": {}
}
],
"upload_params": {
"parallel": 16,
"batch_size": 1024
}
},
{
"name": "lancedb-p-256-sv-64",
"engine": "lancedb",
"connection_params": {},
"collection_params": {},
"search_params": [
{ "parallel": 1, "config": {} },
{ "parallel": 100, "config": {} }
],
"upload_params": {
"parallel": 16,
"indices": [
{
"num_partitions": 256,
"num_sub_vectors": 64
}
]
}
},
{
"name": "lancedb-p-256-sv-16",
"engine": "lancedb",
"connection_params": {},
"collection_params": {},
"search_params": [
{ "parallel": 1, "config": {} },
{ "parallel": 100, "config": {} }
],
"upload_params": {
"parallel": 16,
"indices": [
{
"num_partitions": 256,
"num_sub_vectors": 16
}
]
}
},
{
"name": "lancedb-p-512-sv-8",
"engine": "lancedb",
"connection_params": {},
"collection_params": {},
"search_params": [
{ "parallel": 1, "config": {} },
{ "parallel": 100, "config": {} }
],
"upload_params": {
"parallel": 16,
"indices": [
{
"num_partitions": 512,
"num_sub_vectors": 8
}
]
}
}
]
17 changes: 17 additions & 0 deletions experiments/configurations/lancedb-single-node.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# lancedb Parameters

See https://lancedb.github.io/lancedb/ann_indexes/#how-to-choose-num_partitions-and-num_sub_vectors-for-ivf_pq-index

## search_params
"parallel": 1,8,100,
"config": {}

## upload_params
"parallel": 16,
"batch_size": 1024,
"indices": [
{
"num_partitions": 256,512,
"num_sub_vectors": 8,16,64
}
]
Loading