A python SDK for OceanBase Vector Store, based on SQLAlchemy, compatible with Milvus API.
- git clone this repo, then install with:
poetry install
- install with pip:
pip install pyobvector==0.1.17
You can build document locally with sphinx
:
mkdir build
make html
pyobvector
supports two modes:
Milvus compatible mode
: You can use theMilvusLikeClient
class to use vector storage in a way similar to the Milvus APISQLAlchemy hybrid mode
: You can use the vector storage function provided by theObVecClient
class and execute the relational database statement with the SQLAlchemy library. In this mode, you can regardpyobvector
as an extension of SQLAlchemy.
Refer to tests/test_milvus_like_client.py
for more examples.
A simple workflow to perform ANN search with OceanBase Vector Store:
- setup a client:
from pyobvector import *
client = MilvusLikeClient(uri="127.0.0.1:2881", user="test@test")
- create a collection with vector index:
test_collection_name = "ann_test"
# define the schema of collection with optional partitions
range_part = ObRangePartition(False, range_part_infos = [
RangeListPartInfo('p0', 100),
RangeListPartInfo('p1', 'maxvalue'),
], range_expr='id')
schema = client.create_schema(partitions=range_part)
# define field schema of collection
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=3)
schema.add_field(field_name="meta", datatype=DataType.JSON, nullable=True)
# define index parameters
idx_params = self.client.prepare_index_params()
idx_params.add_index(
field_name='embedding',
index_type=VecIndexType.HNSW,
index_name='vidx',
metric_type="L2",
params={"M": 16, "efConstruction": 256},
)
# create collection
client.create_collection(
collection_name=test_collection_name,
schema=schema,
index_params=idx_params,
)
- insert data to your collection:
# prepare
vector_value1 = [0.748479,0.276979,0.555195]
vector_value2 = [0, 0, 0]
data1 = [{'id': i, 'embedding': vector_value1} for i in range(10)]
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(10, 13)])
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(111, 113)])
# insert data
client.insert(collection_name=test_collection_name, data=data1)
- do ann search:
res = client.search(collection_name=test_collection_name, data=[0,0,0], anns_field='embedding', limit=5, output_fields=['id'])
# For example, the result will be:
# [{'id': 112}, {'id': 111}, {'id': 10}, {'id': 11}, {'id': 12}]
- setup a client:
from pyobvector import *
from sqlalchemy import Column, Integer, JSON
from sqlalchemy import func
client = ObVecClient(uri="127.0.0.1:2881", user="test@test")
- create a partitioned table with vector index:
# create partitioned table
range_part = ObRangePartition(False, range_part_infos = [
RangeListPartInfo('p0', 100),
RangeListPartInfo('p1', 'maxvalue'),
], range_expr='id')
cols = [
Column('id', Integer, primary_key=True, autoincrement=False),
Column('embedding', VECTOR(3)),
Column('meta', JSON)
]
client.create_table(test_collection_name, columns=cols, partitions=range_part)
# create vector index
client.create_index(
test_collection_name,
is_vec_index=True,
index_name='vidx',
column_names=['embedding'],
vidx_params='distance=l2, type=hnsw, lib=vsag',
)
- insert data to your collection:
# insert data
vector_value1 = [0.748479,0.276979,0.555195]
vector_value2 = [0, 0, 0]
data1 = [{'id': i, 'embedding': vector_value1} for i in range(10)]
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(10, 13)])
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(111, 113)])
client.insert(test_collection_name, data=data1)
- do ann search:
# perform ann search
res = self.client.ann_search(
test_collection_name,
vec_data=[0,0,0],
vec_column_name='embedding',
distance_func=l2_distance,
topk=5,
output_column_names=['id']
)
# For example, the result will be:
# [(112,), (111,), (10,), (11,), (12,)]
- If you want to use pure
SQLAlchemy
API withOceanBase
dialect, you can just get anSQLAlchemy.engine
viaclient.engine
. The engine can also be created as following:
import pyobvector
from sqlalchemy.dialects import registry
from sqlalchemy import create_engine
uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("mysql.oceanbase", "pyobvector.schema.dialect", "OceanBaseDialect")
connection_str = (
f"mysql+oceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4"
)
engine = create_engine(connection_str, **kwargs)
- Async engine is also supported:
import pyobvector
from sqlalchemy.dialects import registry
from sqlalchemy.ext.asyncio import create_async_engine
uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("mysql.aoceanbase", "pyobvector", "AsyncOceanBaseDialect")
connection_str = (
f"mysql+aoceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4"
)
engine = create_async_engine(connection_str)
- For further usage in pure
SQLAlchemy
mode, please refer to SQLAlchemy