From ee8be5c7ccbe9b153cd5443a226917e6e79f646f Mon Sep 17 00:00:00 2001 From: Tarun Makkar Date: Sun, 31 Mar 2024 14:59:28 +0530 Subject: [PATCH 1/9] Creating PostgresDataBackend from IbisDataBackend for pgvector implementation --- CHANGELOG.md | 1 + superduperdb/backends/base/backends.py | 2 + .../backends/postgres/data_backend.py | 106 ++++++++++++++++++ superduperdb/base/build.py | 15 +++ superduperdb/base/superduper.py | 3 + 5 files changed, 127 insertions(+) create mode 100644 superduperdb/backends/postgres/data_backend.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d442a6a8..d113b1f72 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Rename `_Predictor` to `Model` - Allow developers to write `Listeners` and `Graph` in a single formalism - Change unittesting framework to pure configuration (no patching configs) +- Adding `PostgresDataBackend` for `Pgvector` integration #### Bug Fixes - Fixed a bug in refresh_after_insert for listeners with select None diff --git a/superduperdb/backends/base/backends.py b/superduperdb/backends/base/backends.py index 2d653f532..8a4038fca 100644 --- a/superduperdb/backends/base/backends.py +++ b/superduperdb/backends/base/backends.py @@ -2,6 +2,7 @@ from pymongo import MongoClient from superduperdb.backends.ibis.data_backend import IbisDataBackend +from superduperdb.backends.postgres.data_backend import PostgresDataBackend from superduperdb.backends.local.artifacts import FileSystemArtifactStore from superduperdb.backends.mongodb.artifacts import MongoArtifactStore from superduperdb.backends.mongodb.data_backend import MongoDataBackend @@ -14,6 +15,7 @@ data_backends = { 'mongodb': MongoDataBackend, 'ibis': IbisDataBackend, + 'postgres' : PostgresDataBackend } artifact_stores = { diff --git a/superduperdb/backends/postgres/data_backend.py b/superduperdb/backends/postgres/data_backend.py new file mode 100644 index 000000000..f20c78848 --- /dev/null +++ b/superduperdb/backends/postgres/data_backend.py @@ -0,0 +1,106 @@ +from superduperdb.backends.ibis.data_backend import IbisDataBackend + + +import typing as t +from warnings import warn + +import ibis +import pandas +from ibis.backends.base import BaseBackend + +from superduperdb.backends.ibis.db_helper import get_db_helper +from superduperdb.backends.ibis.field_types import FieldType, dtype +from superduperdb.backends.ibis.query import Table +from superduperdb.backends.local.artifacts import FileSystemArtifactStore +from superduperdb.backends.sqlalchemy.metadata import SQLAlchemyMetadata +from superduperdb.components.datatype import DataType +from superduperdb.components.schema import Schema + +BASE64_PREFIX = 'base64:' +INPUT_KEY = '_input_id' + + + + +class PostgresDataBackend(IbisDataBackend): + def __init__(self, conn: BaseBackend, name: str, in_memory: bool = False): + super().__init__(conn=conn, name=name) + self.in_memory = in_memory + self.dialect = getattr(conn, 'name', 'base') + self.db_helper = get_db_helper(self.dialect) + + def url(self): + return self.conn.con.url + self.name + + def build_artifact_store(self): + return FileSystemArtifactStore(conn='.superduperdb/artifacts/', name='ibis') + + def build_metadata(self): + return SQLAlchemyMetadata(conn=self.conn.con, name='ibis') + + def create_ibis_table(self, identifier: str, schema: Schema): + self.conn.create_table(identifier, schema=schema) + + def insert(self, table_name, raw_documents): + for doc in raw_documents: + for k, v in doc.items(): + doc[k] = self.db_helper.convert_data_format(v) + table_name, raw_documents = self.db_helper.process_before_insert( + table_name, raw_documents + ) + if not self.in_memory: + self.conn.insert(table_name, raw_documents) + else: + self.conn.create_table(table_name, pandas.DataFrame(raw_documents)) + + def create_output_dest( + self, predict_id: str, datatype: t.Union[FieldType, DataType] + ): + msg = ( + "Model must have an encoder to create with the" + f" {type(self).__name__} backend." + ) + assert datatype is not None, msg + if isinstance(datatype, FieldType): + output_type = dtype(datatype.identifier) + else: + output_type = datatype + fields = { + INPUT_KEY: dtype('string'), + 'output': output_type, + } + return Table( + identifier=f'_outputs.{predict_id}', + schema=Schema(identifier=f'_schema/{predict_id}', fields=fields), + ) + + def create_table_and_schema(self, identifier: str, mapping: dict): + """ + Create a schema in the data-backend. + """ + + try: + mapping = self.db_helper.process_schema_types(mapping) + t = self.conn.create_table(identifier, schema=ibis.schema(mapping)) + except Exception as e: + if 'exists' in str(e): + warn("Table already exists, skipping...") + t = self.conn.table(identifier) + else: + raise e + return t + + def drop(self, force: bool = False): + raise NotImplementedError( + "Dropping tables needs to be done in each DB natively" + ) + + def get_table_or_collection(self, identifier): + return self.conn.table(identifier) + + def disconnect(self): + """ + Disconnect the client + """ + + # TODO: implement me \ No newline at end of file diff --git a/superduperdb/base/build.py b/superduperdb/base/build.py index b91832651..1a665c5c8 100644 --- a/superduperdb/base/build.py +++ b/superduperdb/base/build.py @@ -45,6 +45,7 @@ def _build_metadata(cfg, databackend: t.Optional['BaseDataBackend'] = None): if metadata is None: try: + print(metadata_stores) # try to connect to the data backend uri. logging.info("Connecting to Metadata Client with URI: ", cfg.data_backend) return _build_databackend_impl( @@ -117,6 +118,19 @@ def _build_databackend_impl(uri, mapping, type: str = 'data_backend'): name = uri.split('/')[-1] conn = mongomock.MongoClient() return mapping['mongodb'](conn, name) + + elif uri.startswith('postgres://') or uri.startswith("postgresql://"): + name = uri.split('//')[0] + if type == 'data_backend': + ibis_conn = ibis.connect(uri) + print(mapping['postgres']) + return mapping['postgres'](ibis_conn, name) + else: + assert type == 'metadata' + from sqlalchemy import create_engine + + sql_conn = create_engine(uri) + return mapping['sqlalchemy'](sql_conn, name) elif uri.endswith('.csv'): if type == 'metadata': @@ -135,6 +149,7 @@ def _build_databackend_impl(uri, mapping, type: str = 'data_backend'): name = uri.split('//')[0] if type == 'data_backend': ibis_conn = ibis.connect(uri) + print(mapping['ibis']) return mapping['ibis'](ibis_conn, name) else: assert type == 'metadata' diff --git a/superduperdb/base/superduper.py b/superduperdb/base/superduper.py index 9a2363be2..ed2e603e8 100644 --- a/superduperdb/base/superduper.py +++ b/superduperdb/base/superduper.py @@ -35,6 +35,9 @@ def _auto_identify_connection_string(item: str, **kwargs) -> t.Any: elif item.startswith('mongodb+srv://') and 'mongodb.net' in item: kwargs['data_backend'] = item + elif item.startswith('postgres://') or item.startswith('postgresql://'): + kwargs['data_backend'] = item + elif item.endswith('.csv'): kwargs['data_backend'] = item From 66cb314ec40b6552ea277351f09db2a0f021cfbe Mon Sep 17 00:00:00 2001 From: Tarun Makkar Date: Mon, 1 Apr 2024 13:04:50 +0530 Subject: [PATCH 2/9] adding PostgresVectorSearcher --- superduperdb/backends/base/backends.py | 3 + superduperdb/vector_search/postgres.py | 146 +++++++++++++++++++++++++ 2 files changed, 149 insertions(+) create mode 100644 superduperdb/vector_search/postgres.py diff --git a/superduperdb/backends/base/backends.py b/superduperdb/backends/base/backends.py index 8a4038fca..fe55ab085 100644 --- a/superduperdb/backends/base/backends.py +++ b/superduperdb/backends/base/backends.py @@ -11,6 +11,8 @@ from superduperdb.vector_search.atlas import MongoAtlasVectorSearcher from superduperdb.vector_search.in_memory import InMemoryVectorSearcher from superduperdb.vector_search.lance import LanceVectorSearcher +from superduperdb.vector_search.postgres import PostgresVectorSearcher + data_backends = { 'mongodb': MongoDataBackend, @@ -32,6 +34,7 @@ 'lance': LanceVectorSearcher, 'in_memory': InMemoryVectorSearcher, 'mongodb+srv': MongoAtlasVectorSearcher, + 'postgres': PostgresVectorSearcher } CONNECTIONS = { diff --git a/superduperdb/vector_search/postgres.py b/superduperdb/vector_search/postgres.py new file mode 100644 index 000000000..f0dc3c24b --- /dev/null +++ b/superduperdb/vector_search/postgres.py @@ -0,0 +1,146 @@ +import json +import typing as t +import numpy +from pgvector.psycopg import psycopg, register_vector + + +from superduperdb.vector_search.base import BaseVectorSearcher, VectorItem + + +class PostgresVectorSearcher(BaseVectorSearcher): + """ + Implementation of a vector index using the ``pgvector`` library. + :param identifier: Unique string identifier of index + :param dimensions: Dimension of the vector embeddings in the Lance dataset + :param uri: connection string to postgres + :param h: ``torch.Tensor`` + :param index: list of IDs + :param measure: measure to assess similarity + """ + + def __init__( + self, + identifier: str, + dimensions: int, + conninfo: str, + h: t.Optional[numpy.ndarray] = None, + index: t.Optional[t.List[str]] = None, + measure: t.Optional[str] = None, + ): + self.connection = psycopg.connect(conninfo=conninfo) + self.dimensions = dimensions + self.identifier = identifier + if measure == "l2" or not measure: + self.measure_query = "embedding <-> '%s'" + elif measure == "dot": + self.measure_query = "(embedding <#> '%s') * -1" + elif measure == "cosine": + self.measure_query = "1 - (embedding <=> '%s')" + else: + raise NotImplementedError("Unrecognized measure format") + with self.connection.cursor() as cursor: + cursor.execute('CREATE EXTENSION IF NOT EXISTS vector') + cursor.execute( + 'CREATE TABLE IF NOT EXISTS %s (id varchar, embedding vector(%d))' + % (self.identifier, self.dimensions) + ) + register_vector(self.connection) + if h: + self._create_or_append_to_dataset(h, index) + + + def __len__(self): + with self.connection.cursor() as curr: + length = curr.execute( + 'SELECT COUNT(*) FROM %s' % self.identifier + ).fetchone()[0] + return length + + + def _create_or_append_to_dataset(self, vectors, ids): + with self.connection.cursor().copy( + 'COPY %s (id, embedding) FROM STDIN WITH (FORMAT BINARY)' % self.identifier + ) as copy: + copy.set_types(['varchar', 'vector']) + for id_vector, vector in zip(ids, vectors): + copy.write_row([id_vector, vector]) + self.connection.commit() + + + def add(self, items: t.Sequence[VectorItem]) -> None: + """ + Add items to the index. + :param items: t.Sequence of VectorItems + """ + ids = [item.id for item in items] + vectors = [item.vector for item in items] + self._create_or_append_to_dataset(vectors, ids) + + + def delete(self, ids: t.Sequence[str]) -> None: + """ + Remove items from the index + :param ids: t.Sequence of ids of vectors. + """ + with self.connection.cursor() as curr: + for id_vector in ids: + curr.execute( + "DELETE FROM %s WHERE id = '%s'" % (self.identifier, id_vector) + ) + self.connection.commit() + + + def find_nearest_from_id( + self, + _id, + n: int = 100, + within_ids: t.Sequence[str] = (), + ) -> t.Tuple[t.List[str], t.List[float]]: + """ + Find the nearest vectors to the vector with the given id. + :param _id: id of the vector + :param n: number of nearest vectors to return + """ + with self.connection.cursor() as curr: + curr.execute( + """ + SELECT embedding + FROM %s + WHERE id = '%s'""" + % (self.identifier, _id) + ) + h = curr.fetchone()[0] + return self.find_nearest_from_array(h, n, within_ids) + + def find_nearest_from_array( + self, + h: numpy.typing.ArrayLike, + n: int = 100, + within_ids: t.Sequence[str] = (), + ) -> t.Tuple[t.List[str], t.List[float]]: + """ + Find the nearest vectors to the given vector. + :param h: vector + :param n: number of nearest vectors to return + """ + h = self.to_numpy(h)[None, :] + if len(within_ids) == 0: + condition = "1=1" + else: + within_ids_str = ', '.join([f"'{i}'" for i in within_ids]) + condition = f"id in ({within_ids_str})" + query_search_nearest = f""" + SELECT id, {self.measure_query} as distance + FROM %s + WHERE %s + ORDER BY distance + LIMIT %d + """ + with self.connection.cursor() as curr: + curr.execute( + query_search_nearest % (json.dumps(h), self.identifier, condition, n) + ) + nearest_items = curr.fetchall() + ids = [row[0] for row in nearest_items] + scores = [row[1] for row in nearest_items] + return ids, scores From 340415345847c08b64dc67c27ee45e25d0a4f88f Mon Sep 17 00:00:00 2001 From: Tarun Makkar Date: Tue, 2 Apr 2024 15:53:50 +0530 Subject: [PATCH 3/9] makding requested changes --- superduperdb/base/build.py | 3 --- superduperdb/vector_search/postgres.py | 23 ++++++++++++++--------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/superduperdb/base/build.py b/superduperdb/base/build.py index 1a665c5c8..ff974a1db 100644 --- a/superduperdb/base/build.py +++ b/superduperdb/base/build.py @@ -45,7 +45,6 @@ def _build_metadata(cfg, databackend: t.Optional['BaseDataBackend'] = None): if metadata is None: try: - print(metadata_stores) # try to connect to the data backend uri. logging.info("Connecting to Metadata Client with URI: ", cfg.data_backend) return _build_databackend_impl( @@ -123,7 +122,6 @@ def _build_databackend_impl(uri, mapping, type: str = 'data_backend'): name = uri.split('//')[0] if type == 'data_backend': ibis_conn = ibis.connect(uri) - print(mapping['postgres']) return mapping['postgres'](ibis_conn, name) else: assert type == 'metadata' @@ -149,7 +147,6 @@ def _build_databackend_impl(uri, mapping, type: str = 'data_backend'): name = uri.split('//')[0] if type == 'data_backend': ibis_conn = ibis.connect(uri) - print(mapping['ibis']) return mapping['ibis'](ibis_conn, name) else: assert type == 'metadata' diff --git a/superduperdb/vector_search/postgres.py b/superduperdb/vector_search/postgres.py index f0dc3c24b..79f31febf 100644 --- a/superduperdb/vector_search/postgres.py +++ b/superduperdb/vector_search/postgres.py @@ -4,7 +4,7 @@ from pgvector.psycopg import psycopg, register_vector -from superduperdb.vector_search.base import BaseVectorSearcher, VectorItem +from superduperdb.vector_search.base import BaseVectorSearcher, VectorItem, VectorIndexMeasureType class PostgresVectorSearcher(BaseVectorSearcher): @@ -30,14 +30,9 @@ def __init__( self.connection = psycopg.connect(conninfo=conninfo) self.dimensions = dimensions self.identifier = identifier - if measure == "l2" or not measure: - self.measure_query = "embedding <-> '%s'" - elif measure == "dot": - self.measure_query = "(embedding <#> '%s') * -1" - elif measure == "cosine": - self.measure_query = "1 - (embedding <=> '%s')" - else: - raise NotImplementedError("Unrecognized measure format") + self.measure: VectorIndexMeasureType = VectorIndexMeasureType.cosine + self.measure_query = self.get_measure_query() + with self.connection.cursor() as cursor: cursor.execute('CREATE EXTENSION IF NOT EXISTS vector') cursor.execute( @@ -55,6 +50,16 @@ def __len__(self): 'SELECT COUNT(*) FROM %s' % self.identifier ).fetchone()[0] return length + + def get_measure_query(self): + if self.measure.value == "l2": + return "embedding <-> '%s'" + elif self.measure.value == "dot": + return "(embedding <#> '%s') * -1" + elif self.measure.value == "cosine": + return "1 - (embedding <=> '%s')" + else: + raise NotImplementedError("Unrecognized measure format") def _create_or_append_to_dataset(self, vectors, ids): From e95f5d7f19c55552f234d8d852e84efaf6eae5d2 Mon Sep 17 00:00:00 2001 From: Tarun Makkar Date: Sun, 7 Apr 2024 02:02:45 +0530 Subject: [PATCH 4/9] add hnsw, ivfflat indexing methods --- superduperdb/vector_search/postgres.py | 82 ++++++++++++++++++++++---- 1 file changed, 71 insertions(+), 11 deletions(-) diff --git a/superduperdb/vector_search/postgres.py b/superduperdb/vector_search/postgres.py index 79f31febf..90c13035b 100644 --- a/superduperdb/vector_search/postgres.py +++ b/superduperdb/vector_search/postgres.py @@ -6,6 +6,40 @@ from superduperdb.vector_search.base import BaseVectorSearcher, VectorItem, VectorIndexMeasureType +class PostgresIndexing: + cosine = "vector_cosine_ops" + l2 = "vector_l2_ops" + inner_product = "vector_ip_ops" + + +class IVFFlat(PostgresIndexing): + """ + An IVFFlat index divides vectors into lists, and then searches a subset of those lists that are closest to the query vector. + It has faster build times and uses less memory than HNSW, but has lower query performance (in terms of speed-recall tradeoff). + + :param lists + :param probes + """ + def __init__(self, lists: t.Optional[int] = 100, probes: t.Optional[int] = 1): + self.name = "ivfflat" + self.lists = lists + self.probes = probes + +class HNSW(PostgresIndexing): + """ + An HNSW index creates a multilayer graph. It has better query performance than IVFFlat (in terms of speed-recall tradeoff), + but has slower build times and uses more memory. Also, an index can be created without any data in the table + since there isn’t a training step like IVFFlat. + + :param m: the max number of connections per layer + :param ef_construction: the size of the dynamic candidate list for constructing the graph + """ + def __init__(self, m: t.Optional[int] = 16, ef_construction: t.Optional[int] = 64, ef_search: t.Optional[int] = 40): + self.name = "hnsw" + self.m = m + self.ef_construction = ef_construction + self.ef_search: ef_search = ef_search + class PostgresVectorSearcher(BaseVectorSearcher): """ @@ -22,16 +56,20 @@ def __init__( self, identifier: str, dimensions: int, - conninfo: str, + uri: str, h: t.Optional[numpy.ndarray] = None, index: t.Optional[t.List[str]] = None, - measure: t.Optional[str] = None, + measure: t.Optional[str] = VectorIndexMeasureType.cosine, + indexing : t.Optional[HNSW | IVFFlat] = None, + indexing_measure : t.Optional[PostgresIndexing] = PostgresIndexing.cosine ): - self.connection = psycopg.connect(conninfo=conninfo) + self.connection = psycopg.connect(uri) self.dimensions = dimensions self.identifier = identifier - self.measure: VectorIndexMeasureType = VectorIndexMeasureType.cosine + self.measure = measure self.measure_query = self.get_measure_query() + self.indexing = indexing + self.indexing_measure = indexing_measure with self.connection.cursor() as cursor: cursor.execute('CREATE EXTENSION IF NOT EXISTS vector') @@ -63,12 +101,31 @@ def get_measure_query(self): def _create_or_append_to_dataset(self, vectors, ids): - with self.connection.cursor().copy( - 'COPY %s (id, embedding) FROM STDIN WITH (FORMAT BINARY)' % self.identifier - ) as copy: - copy.set_types(['varchar', 'vector']) - for id_vector, vector in zip(ids, vectors): - copy.write_row([id_vector, vector]) + with self.connection.cursor() as cursor: + for id_, vector in zip(ids, vectors): + try: + cursor.execute( + "INSERT INTO %s (id, embedding) VALUES (%s, '%s');" % (self.identifier, id_, vector) + ) + except Exception as e: + pass + self.connection.commit() + + def _create_index(self): + with self.connection.cursor() as cursor: + if self.indexing.name == 'hnsw': + cursor.execute("""CREATE INDEX ON %s + USING %s (embedding %s) + WITH (m = %s, ef_construction = %s);""" % (self.identifier, self.indexing.name, self.indexing_measure, self.indexing.m, self.indexing.ef_construction)) + + cursor.execute("""SET %s.ef_search = %s;""" % (self.indexing.name, self.indexing.ef_search)) + elif self.indexing.name == 'ivfflat': + cursor.execute("""CREATE INDEX ON %s + USING %s (embedding %s) + WITH (lists = %s);""" % (self.identifier, self.indexing.name, self.indexing_measure, self.indexing.lists)) + + cursor.execute("""SET %s.probes = %s;""" % (self.indexing.name, self.indexing.probes)) + self.connection.commit() @@ -81,6 +138,9 @@ def add(self, items: t.Sequence[VectorItem]) -> None: vectors = [item.vector for item in items] self._create_or_append_to_dataset(vectors, ids) + if self.indexing: + self._create_index() + def delete(self, ids: t.Sequence[str]) -> None: """ @@ -128,7 +188,7 @@ def find_nearest_from_array( :param h: vector :param n: number of nearest vectors to return """ - h = self.to_numpy(h)[None, :] + # h = self.to_numpy(h)[None, :] if len(within_ids) == 0: condition = "1=1" else: From a70f95ffdad8908124f7e357559b637da96a5eeb Mon Sep 17 00:00:00 2001 From: Tarun Makkar Date: Thu, 11 Apr 2024 14:32:11 +0530 Subject: [PATCH 5/9] removing PostgresDatabackend --- superduperdb/backends/base/backends.py | 2 - .../backends/postgres/data_backend.py | 106 ------------------ superduperdb/base/build.py | 2 +- superduperdb/vector_search/postgres.py | 55 ++++++++- 4 files changed, 51 insertions(+), 114 deletions(-) delete mode 100644 superduperdb/backends/postgres/data_backend.py diff --git a/superduperdb/backends/base/backends.py b/superduperdb/backends/base/backends.py index fe55ab085..9f1825a70 100644 --- a/superduperdb/backends/base/backends.py +++ b/superduperdb/backends/base/backends.py @@ -2,7 +2,6 @@ from pymongo import MongoClient from superduperdb.backends.ibis.data_backend import IbisDataBackend -from superduperdb.backends.postgres.data_backend import PostgresDataBackend from superduperdb.backends.local.artifacts import FileSystemArtifactStore from superduperdb.backends.mongodb.artifacts import MongoArtifactStore from superduperdb.backends.mongodb.data_backend import MongoDataBackend @@ -17,7 +16,6 @@ data_backends = { 'mongodb': MongoDataBackend, 'ibis': IbisDataBackend, - 'postgres' : PostgresDataBackend } artifact_stores = { diff --git a/superduperdb/backends/postgres/data_backend.py b/superduperdb/backends/postgres/data_backend.py deleted file mode 100644 index f20c78848..000000000 --- a/superduperdb/backends/postgres/data_backend.py +++ /dev/null @@ -1,106 +0,0 @@ -from superduperdb.backends.ibis.data_backend import IbisDataBackend - - -import typing as t -from warnings import warn - -import ibis -import pandas -from ibis.backends.base import BaseBackend - -from superduperdb.backends.ibis.db_helper import get_db_helper -from superduperdb.backends.ibis.field_types import FieldType, dtype -from superduperdb.backends.ibis.query import Table -from superduperdb.backends.local.artifacts import FileSystemArtifactStore -from superduperdb.backends.sqlalchemy.metadata import SQLAlchemyMetadata -from superduperdb.components.datatype import DataType -from superduperdb.components.schema import Schema - -BASE64_PREFIX = 'base64:' -INPUT_KEY = '_input_id' - - - - -class PostgresDataBackend(IbisDataBackend): - def __init__(self, conn: BaseBackend, name: str, in_memory: bool = False): - super().__init__(conn=conn, name=name) - self.in_memory = in_memory - self.dialect = getattr(conn, 'name', 'base') - self.db_helper = get_db_helper(self.dialect) - - def url(self): - return self.conn.con.url + self.name - - def build_artifact_store(self): - return FileSystemArtifactStore(conn='.superduperdb/artifacts/', name='ibis') - - def build_metadata(self): - return SQLAlchemyMetadata(conn=self.conn.con, name='ibis') - - def create_ibis_table(self, identifier: str, schema: Schema): - self.conn.create_table(identifier, schema=schema) - - def insert(self, table_name, raw_documents): - for doc in raw_documents: - for k, v in doc.items(): - doc[k] = self.db_helper.convert_data_format(v) - table_name, raw_documents = self.db_helper.process_before_insert( - table_name, raw_documents - ) - if not self.in_memory: - self.conn.insert(table_name, raw_documents) - else: - self.conn.create_table(table_name, pandas.DataFrame(raw_documents)) - - def create_output_dest( - self, predict_id: str, datatype: t.Union[FieldType, DataType] - ): - msg = ( - "Model must have an encoder to create with the" - f" {type(self).__name__} backend." - ) - assert datatype is not None, msg - if isinstance(datatype, FieldType): - output_type = dtype(datatype.identifier) - else: - output_type = datatype - fields = { - INPUT_KEY: dtype('string'), - 'output': output_type, - } - return Table( - identifier=f'_outputs.{predict_id}', - schema=Schema(identifier=f'_schema/{predict_id}', fields=fields), - ) - - def create_table_and_schema(self, identifier: str, mapping: dict): - """ - Create a schema in the data-backend. - """ - - try: - mapping = self.db_helper.process_schema_types(mapping) - t = self.conn.create_table(identifier, schema=ibis.schema(mapping)) - except Exception as e: - if 'exists' in str(e): - warn("Table already exists, skipping...") - t = self.conn.table(identifier) - else: - raise e - return t - - def drop(self, force: bool = False): - raise NotImplementedError( - "Dropping tables needs to be done in each DB natively" - ) - - def get_table_or_collection(self, identifier): - return self.conn.table(identifier) - - def disconnect(self): - """ - Disconnect the client - """ - - # TODO: implement me \ No newline at end of file diff --git a/superduperdb/base/build.py b/superduperdb/base/build.py index ff974a1db..b8fcef91a 100644 --- a/superduperdb/base/build.py +++ b/superduperdb/base/build.py @@ -122,7 +122,7 @@ def _build_databackend_impl(uri, mapping, type: str = 'data_backend'): name = uri.split('//')[0] if type == 'data_backend': ibis_conn = ibis.connect(uri) - return mapping['postgres'](ibis_conn, name) + return mapping['ibis'](ibis_conn, name) else: assert type == 'metadata' from sqlalchemy import create_engine diff --git a/superduperdb/vector_search/postgres.py b/superduperdb/vector_search/postgres.py index 90c13035b..2d69d8e09 100644 --- a/superduperdb/vector_search/postgres.py +++ b/superduperdb/vector_search/postgres.py @@ -1,7 +1,13 @@ import json import typing as t import numpy -from pgvector.psycopg import psycopg, register_vector +import psycopg2 + +from superduperdb import CFG, logging +if t.TYPE_CHECKING: + from superduperdb.components.vector_index import VectorIndex +from superduperdb.components.model import APIModel, Model + from superduperdb.vector_search.base import BaseVectorSearcher, VectorItem, VectorIndexMeasureType @@ -63,21 +69,22 @@ def __init__( indexing : t.Optional[HNSW | IVFFlat] = None, indexing_measure : t.Optional[PostgresIndexing] = PostgresIndexing.cosine ): - self.connection = psycopg.connect(uri) + self.connection = psycopg2.connect(uri) self.dimensions = dimensions self.identifier = identifier self.measure = measure self.measure_query = self.get_measure_query() self.indexing = indexing self.indexing_measure = indexing_measure - + print('creation started') with self.connection.cursor() as cursor: cursor.execute('CREATE EXTENSION IF NOT EXISTS vector') cursor.execute( - 'CREATE TABLE IF NOT EXISTS %s (id varchar, embedding vector(%d))' + 'CREATE TABLE IF NOT EXISTS "%s" (id varchar, embedding vector(%d))' % (self.identifier, self.dimensions) ) - register_vector(self.connection) + self.connection.commit() + print("table created") if h: self._create_or_append_to_dataset(h, index) @@ -209,3 +216,41 @@ def find_nearest_from_array( ids = [row[0] for row in nearest_items] scores = [row[1] for row in nearest_items] return ids, scores + + @classmethod + def from_component(cls, vi: 'VectorIndex'): + from superduperdb.components.listener import Listener + from superduperdb.components.model import ObjectModel + + assert isinstance(vi.indexing_listener, Listener) + collection = vi.indexing_listener.select.table_or_collection.identifier + + print(collection) + + indexing_key = vi.indexing_listener.key + + print(indexing_key) + assert isinstance( + indexing_key, str + ), 'Only single key is support for atlas search' + if indexing_key.startswith('_outputs'): + indexing_key = indexing_key.split('.')[1] + assert isinstance(vi.indexing_listener.model, Model) or isinstance( + vi.indexing_listener.model, APIModel + ) + assert isinstance(collection, str), 'Collection is required to be a string' + indexing_model = vi.indexing_listener.model.identifier + + print(indexing_model) + indexing_version = vi.indexing_listener.model.version + + print(indexing_version) + output_path = f'_outputs.{vi.indexing_listener.predict_id}' + print(output_path) + + return PostgresVectorSearcher( + uri="postgresql://postgres:test@localhost:8000/qa", + identifier=output_path, + dimensions=vi.dimensions, + measure=VectorIndexMeasureType.cosine, + ) \ No newline at end of file From 2e344aa168c723fca0cf0bf690c5f5eceec111d4 Mon Sep 17 00:00:00 2001 From: Tarun Makkar Date: Thu, 11 Apr 2024 14:35:18 +0530 Subject: [PATCH 6/9] removing unnecessary statements --- superduperdb/vector_search/postgres.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/superduperdb/vector_search/postgres.py b/superduperdb/vector_search/postgres.py index 2d69d8e09..6a7db7de1 100644 --- a/superduperdb/vector_search/postgres.py +++ b/superduperdb/vector_search/postgres.py @@ -76,7 +76,6 @@ def __init__( self.measure_query = self.get_measure_query() self.indexing = indexing self.indexing_measure = indexing_measure - print('creation started') with self.connection.cursor() as cursor: cursor.execute('CREATE EXTENSION IF NOT EXISTS vector') cursor.execute( @@ -84,7 +83,6 @@ def __init__( % (self.identifier, self.dimensions) ) self.connection.commit() - print("table created") if h: self._create_or_append_to_dataset(h, index) @@ -225,11 +223,9 @@ def from_component(cls, vi: 'VectorIndex'): assert isinstance(vi.indexing_listener, Listener) collection = vi.indexing_listener.select.table_or_collection.identifier - print(collection) indexing_key = vi.indexing_listener.key - print(indexing_key) assert isinstance( indexing_key, str ), 'Only single key is support for atlas search' @@ -241,12 +237,9 @@ def from_component(cls, vi: 'VectorIndex'): assert isinstance(collection, str), 'Collection is required to be a string' indexing_model = vi.indexing_listener.model.identifier - print(indexing_model) indexing_version = vi.indexing_listener.model.version - print(indexing_version) output_path = f'_outputs.{vi.indexing_listener.predict_id}' - print(output_path) return PostgresVectorSearcher( uri="postgresql://postgres:test@localhost:8000/qa", From 009bdfa33128a19d10372ded5b47c9f0e052ca8f Mon Sep 17 00:00:00 2001 From: Tarun Makkar Date: Sat, 13 Apr 2024 16:28:29 +0530 Subject: [PATCH 7/9] integration testing --- superduperdb/vector_search/postgres.py | 4 +- .../backends/postgres/test_pg_vector.py | 115 ++++++++++++++++++ 2 files changed, 117 insertions(+), 2 deletions(-) create mode 100644 test/integration/backends/postgres/test_pg_vector.py diff --git a/superduperdb/vector_search/postgres.py b/superduperdb/vector_search/postgres.py index 6a7db7de1..4a04ceba4 100644 --- a/superduperdb/vector_search/postgres.py +++ b/superduperdb/vector_search/postgres.py @@ -79,7 +79,7 @@ def __init__( with self.connection.cursor() as cursor: cursor.execute('CREATE EXTENSION IF NOT EXISTS vector') cursor.execute( - 'CREATE TABLE IF NOT EXISTS "%s" (id varchar, embedding vector(%d))' + 'CREATE TABLE IF NOT EXISTS "%s" (id varchar, txt VARCHAR, embedding vector(%d))' % (self.identifier, self.dimensions) ) self.connection.commit() @@ -242,7 +242,7 @@ def from_component(cls, vi: 'VectorIndex'): output_path = f'_outputs.{vi.indexing_listener.predict_id}' return PostgresVectorSearcher( - uri="postgresql://postgres:test@localhost:8000/qa", + uri=CFG.data_backend, identifier=output_path, dimensions=vi.dimensions, measure=VectorIndexMeasureType.cosine, diff --git a/test/integration/backends/postgres/test_pg_vector.py b/test/integration/backends/postgres/test_pg_vector.py new file mode 100644 index 000000000..032d04338 --- /dev/null +++ b/test/integration/backends/postgres/test_pg_vector.py @@ -0,0 +1,115 @@ +import random +import warnings +import tempfile +import ibis + +import lorem +import psycopg2 +import pytest + +import superduperdb as s +from superduperdb import CFG, superduper +from superduperdb.backends.ibis.data_backend import IbisDataBackend +from superduperdb.base.datalayer import Datalayer +from superduperdb.backends.sqlalchemy.metadata import SQLAlchemyMetadata +from superduperdb.backends.local.artifacts import FileSystemArtifactStore +from superduperdb.backends.ibis.query import Table +from superduperdb.base.document import Document +from superduperdb.components.listener import Listener +from superduperdb.components.model import ObjectModel +from superduperdb.components.vector_index import VectorIndex, vector +from superduperdb.components.schema import Schema +from superduperdb.backends.ibis.field_types import dtype + + +@pytest.fixture +def postgres_conn(): + with tempfile.TemporaryDirectory() as tmp_dir: + tmp_db = f'{tmp_dir}/mydb.sqlite' + yield ibis.connect('postgres://' + str(tmp_db)), tmp_dir + +@pytest.fixture +def test_db(postgres_conn): + connection, tmp_dir = postgres_conn + yield make_ibis_db(connection, connection, tmp_dir) + + +def make_ibis_db(db_conn, metadata_conn, tmp_dir, in_memory=False): + return Datalayer( + databackend=IbisDataBackend(conn=db_conn, name='ibis', in_memory=in_memory), + metadata=SQLAlchemyMetadata(conn=metadata_conn.con, name='ibis'), + artifact_store=FileSystemArtifactStore(conn=tmp_dir, name='ibis'), + ) + + +def random_vector_model(x): + return [random.random() for _ in range(16)] + + +@pytest.fixture() +def pgvector_search_config(): + previous = s.CFG.vector_search + s.CFG.vector_search = s.CFG.data_backend + yield + s.CFG.vector_search = previous + + +@pytest.mark.skipif(DO_SKIP, reason='Only pgvector deployments relevant.') +def test_setup_pgvector_vector_search(pgvector_search_config): + model = ObjectModel( + identifier='test-model', object=random_vector_model, encoder=vector(shape=(16,)) + ) + db = superduper() + schema = Schema( + identifier='docs-schema', + fields={ + 'text': dtype('str', schema=schema), + }, + ) + table = Table('docs', schema=schema) + + vector_indexes = db.vector_indices + + assert not vector_indexes + + db.execute( + table.insert_many( + [Document({'text': lorem.sentence()}) for _ in range(50)] + ) + ) + db.add( + VectorIndex( + 'test-vector-index', + indexing_listener=Listener( + model=model, + key='text', + select=table.select('text'), + ), + ) + ) + + assert 'test-vector-index' in db.show('vector_index') + assert 'test-vector-index' in db.vector_indices + + +@pytest.mark.skipif(DO_SKIP, reason='Only pgvector deployments relevant.') +def test_use_pgvector_vector_search(pgvector_search_config): + db = superduper() + schema = Schema( + identifier='docs-schema', + fields={ + 'text': dtype('str', schema=schema), + }, + ) + table = Table('docs', schema=schema) + + query = table.like( + Document({'text': 'This is a test'}), n=5, vector_index='test-vector-index' + ).find() + + it = 0 + for r in db.execute(query): + print(r) + it += 1 + + assert it == 5 From 4bf72be48668788d2f4bad9872175d0276434d8d Mon Sep 17 00:00:00 2001 From: Tarun Makkar Date: Thu, 18 Apr 2024 03:16:04 +0530 Subject: [PATCH 8/9] testing and adding pgvector.ipynb notebook --- examples/pgvector.ipynb | 2501 ++++++++++++++++++++ superduperdb/backends/base/backends.py | 2 +- superduperdb/backends/ibis/query.py | 66 +- superduperdb/vector_search/interface.py | 34 +- superduperdb/vector_search/postgres.py | 28 +- superduperdb/vector_search/update_tasks.py | 88 +- 6 files changed, 2629 insertions(+), 90 deletions(-) create mode 100644 examples/pgvector.ipynb diff --git a/examples/pgvector.ipynb b/examples/pgvector.ipynb new file mode 100644 index 000000000..8fc3b55b3 --- /dev/null +++ b/examples/pgvector.ipynb @@ -0,0 +1,2501 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "fc57749f", + "metadata": {}, + "source": [ + "# Building Private Q&A Assistant Using Postgres + Pgvector and Open Source Model" + ] + }, + { + "cell_type": "markdown", + "id": "cafefc8a", + "metadata": {}, + "source": [ + "## Prerequisites\n", + "\n", + "Before starting the implementation, make sure you have the required libraries installed by running the following commands:" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "7c56cd0b", + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "# !pip install superduperdb\n", + "# !pip install vllm\n", + "# !pip install sentence_transformers numpy==1.24.4\n", + "# !pip install 'ibis-framework[postgres]'\n", + "# !pip install pgvector\n", + "# !pip install psycopg2 " + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "9590a2b1", + "metadata": {}, + "outputs": [], + "source": [ + "!rm -rf .superduperdb/ && mkdir -p .superduperdb" + ] + }, + { + "cell_type": "markdown", + "id": "c65ffd13", + "metadata": {}, + "source": [ + "## Connect to datastore \n", + "\n", + "First, we need to establish a connection to a Postgres datastore via SuperDuperDB. You can configure the `Postgres_URI` based on your specific setup. \n", + "Here are some examples of postgres URIs:\n", + "\n", + "* For testing (default connection): `postgres://test`\n", + "* Local postgres instance: `postgres://localhost:27017`\n", + "* postgres with authentication: `postgres://superduper:superduper@postgres:27017/documents`\n", + "* postgres Atlas: `postgres+srv://:@/`" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "cf641454", + "metadata": {}, + "outputs": [], + "source": [ + "from superduperdb.base.config import VectorSearch, Compute" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "20ac07ae", + "metadata": { + "scrolled": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u001b[32m 2024-Apr-18 02:14:21.59\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.base.build\u001b[0m:\u001b[36m65 \u001b[0m | \u001b[1mData Client is ready. \u001b[0m\n", + "\u001b[32m 2024-Apr-18 02:14:21.60\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.base.build\u001b[0m:\u001b[36m30 \u001b[0m | \u001b[1mConnecting to Metadata Client: sqlite:///.superduperdb/metadata.sqlite\u001b[0m\n", + "\u001b[32m 2024-Apr-18 02:14:21.62\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.backends.local.artifacts\u001b[0m:\u001b[36m29 \u001b[0m | \u001b[1mCreating artifact store directory\u001b[0m\n", + "\u001b[32m 2024-Apr-18 02:14:21.62\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.base.build\u001b[0m:\u001b[36m160 \u001b[0m | \u001b[1mConnecting to compute client: None\u001b[0m\n", + "\u001b[32m 2024-Apr-18 02:14:21.62\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.base.datalayer\u001b[0m:\u001b[36m89 \u001b[0m | \u001b[1mBuilding Data Layer\u001b[0m\n" + ] + } + ], + "source": [ + "from superduperdb import superduper\n", + "from superduperdb.backends.ibis import Table\n", + "import os\n", + "from superduperdb.backends.ibis.field_types import dtype\n", + "from superduperdb.ext.pillow import pil_image\n", + "from superduperdb import Schema\n", + "\n", + "connection_uri = \"postgresql://postgres:test@localhost:8000/qa\"\n", + "\n", + "\n", + "# It just super dupers your database\n", + "db = superduper(\n", + " connection_uri,\n", + " metadata_store='sqlite:///.superduperdb/metadata.sqlite',\n", + "# cluster__vector_search = vs,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "87a0fdf1", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "```\n", + "{\n", + " \"cfg\": {\n", + " \"data_backend\": \"mongodb://localhost:27017/test_db\",\n", + " \"lance_home\": \".superduperdb/vector_indices\",\n", + " \"artifact_store\": null,\n", + " \"metadata_store\": null,\n", + " \"cluster\": {\n", + " \"compute\": {\n", + " \"uri\": null,\n", + " \"compute_kwargs\": {}\n", + " },\n", + " \"vector_search\": {\n", + " \"uri\": \"postgresql://postgres:test@localhost:8000/qa\",\n", + " \"type\": \"pg_vector\",\n", + " \"backfill_batch_size\": 100\n", + " },\n", + " \"cdc\": {\n", + " \"uri\": null,\n", + " \"strategy\": null\n", + " }\n", + " },\n", + " \"retries\": {\n", + " \"stop_after_attempt\": 2,\n", + " \"wait_max\": 10.0,\n", + " \"wait_min\": 4.0,\n", + " \"wait_multiplier\": 1.0\n", + " },\n", + " \"downloads\": {\n", + " \"folder\": null,\n", + " \"n_workers\": 0,\n", + " \"headers\": {\n", + " \"User-Agent\": \"me\"\n", + " },\n", + " \"timeout\": null\n", + " },\n", + " \"fold_probability\": 0.05,\n", + " \"log_level\": \"INFO\",\n", + " \"logging_type\": \"SYSTEM\",\n", + " \"bytes_encoding\": \"Bytes\"\n", + " },\n", + " \"cwd\": \"/Users/tarun/Desktop/superduperDB/superduperdb/examples\",\n", + " \"freeze\": [\n", + " \"aiohttp==3.9.3\",\n", + " \"aiohttp-cors==0.7.0\",\n", + " \"aiosignal==1.3.1\",\n", + " \"annotated-types==0.6.0\",\n", + " \"anyio==4.3.0\",\n", + " \"appnope==0.1.4\",\n", + " \"argon2-cffi==23.1.0\",\n", + " \"argon2-cffi-bindings==21.2.0\",\n", + " \"arrow==1.3.0\",\n", + " \"asn1crypto==1.5.1\",\n", + " \"asttokens==2.4.1\",\n", + " \"async-lru==2.0.4\",\n", + " \"atpublic==4.0\",\n", + " \"attrs==23.2.0\",\n", + " \"Babel==2.14.0\",\n", + " \"beautifulsoup4==4.12.3\",\n", + " \"bidict==0.23.1\",\n", + " \"bleach==6.1.0\",\n", + " \"boto3==1.34.69\",\n", + " \"botocore==1.34.69\",\n", + " \"build==1.1.1\",\n", + " \"cachetools==5.3.3\",\n", + " \"certifi==2024.2.2\",\n", + " \"cffi==1.16.0\",\n", + " \"charset-normalizer==3.3.2\",\n", + " \"click==8.1.7\",\n", + " \"cloudpickle==3.0.0\",\n", + " \"colorful==0.5.6\",\n", + " \"comm==0.2.2\",\n", + " \"cryptography==42.0.5\",\n", + " \"dask==2024.3.1\",\n", + " \"debugpy==1.8.1\",\n", + " \"decorator==5.1.1\",\n", + " \"defusedxml==0.7.1\",\n", + " \"dill==0.3.8\",\n", + " \"distlib==0.3.8\",\n", + " \"distributed==2024.3.1\",\n", + " \"dnspython==2.6.1\",\n", + " \"duckdb==0.10.1\",\n", + " \"duckdb_engine==0.11.2\",\n", + " \"executing==2.0.1\",\n", + " \"fastapi==0.110.0\",\n", + " \"fastjsonschema==2.19.1\",\n", + " \"filelock==3.13.2\",\n", + " \"fqdn==1.5.1\",\n", + " \"frozenlist==1.4.1\",\n", + " \"fsspec==2024.3.1\",\n", + " \"ftfy==6.2.0\",\n", + " \"google-api-core==2.18.0\",\n", + " \"google-auth==2.29.0\",\n", + " \"googleapis-common-protos==1.63.0\",\n", + " \"greenlet==3.0.3\",\n", + " \"grpcio==1.62.1\",\n", + " \"h11==0.14.0\",\n", + " \"httpcore==1.0.4\",\n", + " \"httpx==0.27.0\",\n", + " \"huggingface-hub==0.22.0\",\n", + " \"ibis==3.3.0\",\n", + " \"ibis-framework==8.0.0\",\n", + " \"idna==3.6\",\n", + " \"importlib_metadata==7.1.0\",\n", + " \"ipykernel==6.29.3\",\n", + " \"ipython==8.22.2\",\n", + " \"ipython-genutils==0.2.0\",\n", + " \"ipywidgets==8.1.2\",\n", + " \"isoduration==20.11.0\",\n", + " \"jedi==0.19.1\",\n", + " \"Jinja2==3.1.3\",\n", + " \"jmespath==1.0.1\",\n", + " \"joblib==1.3.2\",\n", + " \"json5==0.9.24\",\n", + " \"jsonpointer==2.4\",\n", + " \"jsonschema==4.21.1\",\n", + " \"jsonschema-specifications==2023.12.1\",\n", + " \"jupyter==1.0.0\",\n", + " \"jupyter-console==6.6.3\",\n", + " \"jupyter-events==0.10.0\",\n", + " \"jupyter-lsp==2.2.4\",\n", + " \"jupyter_client==8.6.1\",\n", + " \"jupyter_core==5.7.2\",\n", + " \"jupyter_server==2.13.0\",\n", + " \"jupyter_server_terminals==0.5.3\",\n", + " \"jupyterlab==4.1.5\",\n", + " \"jupyterlab_pygments==0.3.0\",\n", + " \"jupyterlab_server==2.25.4\",\n", + " \"jupyterlab_widgets==3.0.10\",\n", + " \"locket==1.0.0\",\n", + " \"loguru==0.7.2\",\n", + " \"loki-logger-handler==0.1.3\",\n", + " \"markdown-it-py==3.0.0\",\n", + " \"MarkupSafe==2.1.5\",\n", + " \"matplotlib-inline==0.1.6\",\n", + " \"mdurl==0.1.2\",\n", + " \"mistune==3.0.2\",\n", + " \"mongomock==4.1.2\",\n", + " \"mpmath==1.3.0\",\n", + " \"msgpack==1.0.8\",\n", + " \"multidict==6.0.5\",\n", + " \"multipledispatch==1.0.0\",\n", + " \"nbclient==0.10.0\",\n", + " \"nbconvert==7.16.3\",\n", + " \"nbformat==5.10.3\",\n", + " \"nest-asyncio==1.6.0\",\n", + " \"networkx==3.2.1\",\n", + " \"notebook==6.1.5\",\n", + " \"notebook_shim==0.2.4\",\n", + " \"numpy==1.24.4\",\n", + " \"openai-clip==1.0.1\",\n", + " \"opencensus==0.11.4\",\n", + " \"opencensus-context==0.1.3\",\n", + " \"overrides==7.7.0\",\n", + " \"packaging==23.2\",\n", + " \"pandas==2.2.1\",\n", + " \"pandocfilters==1.5.1\",\n", + " \"parso==0.8.3\",\n", + " \"parsy==2.1\",\n", + " \"partd==1.4.1\",\n", + " \"pexpect==4.9.0\",\n", + " \"pgvector==0.2.5\",\n", + " \"pillow==10.2.0\",\n", + " \"pip==23.2.1\",\n", + " \"pip-tools==7.4.1\",\n", + " \"platformdirs==3.11.0\",\n", + " \"prettytable==3.10.0\",\n", + " \"prometheus_client==0.20.0\",\n", + " \"prompt-toolkit==3.0.43\",\n", + " \"proto-plus==1.23.0\",\n", + " \"protobuf==4.25.3\",\n", + " \"psutil==5.9.8\",\n", + " \"psycopg==3.1.18\",\n", + " \"psycopg-binary==3.1.18\",\n", + " \"psycopg-pool==3.2.1\",\n", + " \"psycopg2==2.9.9\",\n", + " \"psycopg2-binary==2.9.9\",\n", + " \"ptyprocess==0.7.0\",\n", + " \"pure-eval==0.2.2\",\n", + " \"py-spy==0.3.14\",\n", + " \"pyarrow==15.0.2\",\n", + " \"pyarrow-hotfix==0.6\",\n", + " \"pyasn1==0.5.1\",\n", + " \"pyasn1-modules==0.3.0\",\n", + " \"pycparser==2.21\",\n", + " \"pydantic==2.6.4\",\n", + " \"pydantic_core==2.16.3\",\n", + " \"Pygments==2.17.2\",\n", + " \"PyJWT==2.8.0\",\n", + " \"pylance==0.8.14\",\n", + " \"pymongo==4.6.2\",\n", + " \"pyOpenSSL==24.1.0\",\n", + " \"pyperclip==1.8.2\",\n", + " \"pyproject_hooks==1.0.0\",\n", + " \"python-dateutil==2.9.0.post0\",\n", + " \"python-dotenv==1.0.1\",\n", + " \"python-json-logger==2.0.7\",\n", + " \"pytz==2024.1\",\n", + " \"PyYAML==6.0.1\",\n", + " \"pyzmq==25.1.2\",\n", + " \"qtconsole==5.5.1\",\n", + " \"QtPy==2.4.1\",\n", + " \"ray==2.10.0\",\n", + " \"readerwriterlock==1.0.9\",\n", + " \"referencing==0.34.0\",\n", + " \"regex==2023.12.25\",\n", + " \"requests==2.31.0\",\n", + " \"rfc3339-validator==0.1.4\",\n", + " \"rfc3986-validator==0.1.1\",\n", + " \"rich==13.7.1\",\n", + " \"rpds-py==0.18.0\",\n", + " \"rsa==4.9\",\n", + " \"s3transfer==0.10.1\",\n", + " \"safetensors==0.4.2\",\n", + " \"scikit-learn==1.4.1.post1\",\n", + " \"scipy==1.12.0\",\n", + " \"Send2Trash==1.8.2\",\n", + " \"sentence-transformers==2.6.0\",\n", + " \"sentinels==1.0.0\",\n", + " \"setuptools==65.5.0\",\n", + " \"six==1.16.0\",\n", + " \"smart-open==7.0.3\",\n", + " \"sniffio==1.3.1\",\n", + " \"snowflake-connector-python==3.7.1\",\n", + " \"snowflake-sqlalchemy==1.5.1\",\n", + " \"sortedcontainers==2.4.0\",\n", + " \"soupsieve==2.5\",\n", + " \"SQLAlchemy==2.0.29\",\n", + " \"sqlalchemy-views==0.3.2\",\n", + " \"sqlglot==20.11.0\",\n", + " \"stack-data==0.6.3\",\n", + " \"starlette==0.36.3\",\n", + " \"-e git+https://github.com/makkarss929/superduperdb.git@009bdfa33128a19d10372ded5b47c9f0e052ca8f#egg=superduperdb\",\n", + " \"sympy==1.12\",\n", + " \"tblib==3.0.0\",\n", + " \"tenacity==8.2.3\",\n", + " \"terminado==0.18.1\",\n", + " \"threadpoolctl==3.4.0\",\n", + " \"tinycss2==1.2.1\",\n", + " \"tokenizers==0.15.2\",\n", + " \"tomlkit==0.12.4\",\n", + " \"toolz==0.12.1\",\n", + " \"torch==2.2.1\",\n", + " \"torchvision==0.17.1\",\n", + " \"tornado==6.4\",\n", + " \"tqdm==4.66.2\",\n", + " \"traitlets==5.14.2\",\n", + " \"transformers==4.39.1\",\n", + " \"typer==0.10.0\",\n", + " \"types-python-dateutil==2.9.0.20240316\",\n", + " \"typing_extensions==4.10.0\",\n", + " \"tzdata==2024.1\",\n", + " \"uri-template==1.3.0\",\n", + " \"urllib3==2.2.1\",\n", + " \"uvicorn==0.29.0\",\n", + " \"virtualenv==20.25.1\",\n", + " \"wcwidth==0.2.13\",\n", + " \"webcolors==1.13\",\n", + " \"webencodings==0.5.1\",\n", + " \"websocket-client==1.7.0\",\n", + " \"wheel==0.43.0\",\n", + " \"widgetsnbextension==4.0.10\",\n", + " \"wrapt==1.16.0\",\n", + " \"yarl==1.9.4\",\n", + " \"zict==3.0.0\",\n", + " \"zipp==3.18.1\"\n", + " ],\n", + " \"hostname\": \"Taruns-Laptop.local\",\n", + " \"os_uname\": [\n", + " \"Darwin\",\n", + " \"Taruns-Laptop.local\",\n", + " \"22.1.0\",\n", + " \"Darwin Kernel Version 22.1.0: Sun Oct 9 20:15:09 PDT 2022; root:xnu-8792.41.9~2/RELEASE_ARM64_T6000\",\n", + " \"x86_64\"\n", + " ],\n", + " \"package_versions\": {},\n", + " \"platform\": {\n", + " \"platform\": \"macOS-10.16-x86_64-i386-64bit\",\n", + " \"python_version\": \"3.11.5\"\n", + " },\n", + " \"startup_time\": \"2024-04-18 02:14:23.585596\",\n", + " \"superduper_db_root\": \"/Users/tarun/Desktop/superduperDB/superduperdb\",\n", + " \"sys\": {\n", + " \"argv\": [\n", + " \"/Users/tarun/Desktop/superduperDB/superduperdb/superduperdb/__main__.py\",\n", + " \"info\"\n", + " ],\n", + " \"path\": [\n", + " \"/Users/tarun/Desktop/superduperDB/superduperdb/examples\",\n", + " \"/Users/tarun/miniconda3/lib/python311.zip\",\n", + " \"/Users/tarun/miniconda3/lib/python3.11\",\n", + " \"/Users/tarun/miniconda3/lib/python3.11/lib-dynload\",\n", + " \"/Users/tarun/Desktop/superduperDB/superduperdb/.venv/lib/python3.11/site-packages\",\n", + " \"__editable__.superduperdb-0.1.1.finder.__path_hook__\"\n", + " ]\n", + " }\n", + "}\n", + "```\n" + ] + } + ], + "source": [ + "!python -m superduperdb info" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "4403b27e", + "metadata": {}, + "outputs": [], + "source": [ + "import glob\n", + "import re\n", + "\n", + "ROOT = '../docs/hr/content/docs/'\n", + "\n", + "STRIDE = 3 # stride in numbers of lines\n", + "WINDOW = 25 # length of window in numbers of lines\n", + "\n", + "files = sorted(glob.glob(f'{ROOT}/**/*.md', recursive=True))\n", + "\n", + "def get_chunk_link(chunk, file_name):\n", + " # Get the original link of the chunk\n", + " file_link = file_name[:-3].replace(ROOT, 'https://docs.superduperdb.com/docs/docs/')\n", + " # If the chunk has subtitles, the link to the first subtitle will be used first.\n", + " first_title = (re.findall(r'(^|\\n)## (.*?)\\n', chunk) or [(None, None)])[0][1]\n", + " if first_title:\n", + " # Convert subtitles and splice URLs\n", + " first_title = first_title.lower()\n", + " first_title = re.sub(r'[^a-zA-Z0-9]', '-', first_title)\n", + " file_link = file_link + '#' + first_title\n", + " return file_link\n", + "\n", + "def create_chunk_and_links(file, file_prefix=ROOT):\n", + " with open(file, 'r') as f:\n", + " lines = f.readlines()\n", + " if len(lines) > WINDOW:\n", + " chunks = ['\\n'.join(lines[i: i + WINDOW]) for i in range(0, len(lines), STRIDE)]\n", + " else:\n", + " chunks = ['\\n'.join(lines)]\n", + " return [{'txt': chunk, 'link': get_chunk_link(chunk, file)} for chunk in chunks]\n", + "\n", + "\n", + "all_chunks_and_links = sum([create_chunk_and_links(file) for file in files], [])" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "0bc95cfb", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " % Total % Received % Xferd Average Speed Time Time Time Current\n", + " Dload Upload Total Spent Left Speed\n", + "100 737k 100 737k 0 0 332k 0 0:00:02 0:00:02 --:--:-- 333k\n" + ] + } + ], + "source": [ + "# Use !curl to download the 'superduperdb_docs.json' file\n", + "!curl -O https://datas-public.s3.amazonaws.com/superduperdb_docs.json\n", + "\n", + "import json\n", + "from IPython.display import Markdown\n", + "\n", + "# Open the downloaded JSON file and load its contents into the 'chunks' variable\n", + "with open('superduperdb_docs.json') as f:\n", + " all_chunks_and_links = json.load(f)" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "1a99d3af", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'txt': '# Anthropic\\n\\n\\n\\n`superduperdb` allows users to work with `anthropic` API models.\\n\\n\\n\\nRead more about this [here](/docs/docs/walkthrough/ai_models#anthropic).',\n", + " 'link': 'https://docs.superduperdb.com/docs/docs/ai_integrations/anthropic'}" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "all_chunks_and_links[0]" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "1a55feba", + "metadata": {}, + "outputs": [], + "source": [ + "new_all_chunks_and_links = list()\n", + "for i, e in enumerate(all_chunks_and_links):\n", + " e['id'] = i\n", + " new_all_chunks_and_links.append(e)" + ] + }, + { + "cell_type": "markdown", + "id": "9ae3ce2b", + "metadata": {}, + "source": [ + "## Define Schema and Create table\n", + "\n", + "For this use-case, you need a table with images and another table with text. SuperDuperDB extends standard SQL functionality, allowing developers to define their own data types through the `Encoder` abstraction." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "cafbb553", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Schema(identifier='questiondocs-schema', fields={'id': FieldType(identifier='String'), 'txt': FieldType(identifier='String'), 'link': FieldType(identifier='String')})" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "Schema(\n", + " 'questiondocs-schema',\n", + " fields={'id': dtype(str), 'txt': dtype(str), 'link': dtype(str)},\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "01157866", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "([], Table(identifier='questiondocs'))" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# \n", + "# Define the 'captions' table\n", + "table = Table(\n", + " 'questiondocs',\n", + " primary_id='id',\n", + " schema=Schema(\n", + " 'questiondocs-schema',\n", + " fields={'id': dtype(str), 'txt': dtype(str), 'link': dtype(str)},\n", + " )\n", + ")\n", + "\n", + "\n", + "\n", + "# Add the 'captions' and 'images' tables to the SuperDuperDB database\n", + "db.add(table)" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "19285213", + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "893ca27f", + "metadata": {}, + "outputs": [], + "source": [ + "new_all_chunks_and_links_df = pd.DataFrame(new_all_chunks_and_links)" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "e40ac657", + "metadata": {}, + "outputs": [], + "source": [ + "df = new_all_chunks_and_links_df.astype(str)" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "5ce19377", + "metadata": {}, + "outputs": [], + "source": [ + "from superduperdb.base.document import Document as D\n" + ] + }, + { + "cell_type": "raw", + "id": "6a8890af", + "metadata": {}, + "source": [ + "table.insert(df[['id', 'txt', 'link']])" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "c9360f32", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u001b[32m 2024-Apr-18 02:14:26.91\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.backends.local.compute\u001b[0m:\u001b[36m34 \u001b[0m | \u001b[1mSubmitting job. function:\u001b[0m\n", + "\u001b[32m 2024-Apr-18 02:14:27.15\u001b[0m| \u001b[32m\u001b[1mSUCCESS \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.backends.local.compute\u001b[0m:\u001b[36m40 \u001b[0m | \u001b[32m\u001b[1mJob submitted on . function: future:16073078-3d35-4e73-90ba-e7a862f6b03a\u001b[0m\n" + ] + } + ], + "source": [ + "insert = table.insert(\n", + " [\n", + " D(\n", + " {\n", + " 'id': d['id'],\n", + " 'txt': d['txt'],\n", + " 'link': d['link'],\n", + " }\n", + " )\n", + " for i, d in df.iterrows()\n", + " ]\n", + " )\n", + "_ = db.execute(insert)" + ] + }, + { + "cell_type": "raw", + "id": "4b531e01", + "metadata": { + "scrolled": true + }, + "source": [ + "_ = db.execute(table.insert(df[['id', 'txt', 'link']]))" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "f819881e", + "metadata": {}, + "outputs": [], + "source": [ + "q = table.select('txt', 'link')" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "42b549b6", + "metadata": {}, + "outputs": [], + "source": [ + "result = db.execute(q)" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "d2fcdeef", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'txt': '# Anthropic\\n\\n\\n\\n`superduperdb` allows users to work with `anthropic` API models.\\n\\n\\n\\nRead more about this [here](/docs/docs/walkthrough/ai_models#anthropic).',\n", + " 'link': 'https://docs.superduperdb.com/docs/docs/ai_integrations/anthropic'}" + ] + }, + "execution_count": 19, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "result[0]" + ] + }, + { + "cell_type": "markdown", + "id": "8715ffd0", + "metadata": {}, + "source": [ + "A `Model` is a wrapper around a self-built or ecosystem model, such as `torch`, `transformers`, `openai`." + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "06f57c03", + "metadata": { + "scrolled": true + }, + "outputs": [ + { + "data": { + "text/plain": [ + "DataType(identifier='vector[1024]', encoder=None, decoder=None, info=None, shape=(1024,), directory=None, encodable='native', bytes_encoding=)" + ] + }, + "execution_count": 20, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from superduperdb import vector\n", + "vector(shape=(1024,))" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "id": "890e9607", + "metadata": {}, + "outputs": [], + "source": [ + "import sentence_transformers\n", + "from superduperdb.ext.sentence_transformers import SentenceTransformer\n", + "from superduperdb.ext.numpy import array" + ] + }, + { + "cell_type": "markdown", + "id": "4ec039cb", + "metadata": {}, + "source": [ + "### Model" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "c617f81b", + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "model = SentenceTransformer(\n", + " identifier=\"embedding\",\n", + " object=sentence_transformers.SentenceTransformer(\"BAAI/bge-large-en-v1.5\"),\n", + " postprocess=lambda x: x.tolist(),\n", + " datatype=vector(shape=(1024,)),\n", + " predict_kwargs={\"show_progress_bar\": True},\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "id": "f02690d7", + "metadata": { + "scrolled": true + }, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "afeed59560bd4e6c943b9ff065a8cc28", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "Batches: 0%| | 0/1 [00:00, field='vector_index', callable=None)" + ] + }, + "execution_count": 26, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "db.vector_indices" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "id": "1ca82a99", + "metadata": { + "scrolled": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u001b[32m 2024-Apr-18 02:14:37.48\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.components.component\u001b[0m:\u001b[36m374 \u001b[0m | \u001b[1mInitializing DataType : dill\u001b[0m\n", + "\u001b[32m 2024-Apr-18 02:14:37.48\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.components.component\u001b[0m:\u001b[36m377 \u001b[0m | \u001b[1mInitialized DataType : dill successfully\u001b[0m\n", + "\u001b[32m 2024-Apr-18 02:14:50.09\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.backends.local.compute\u001b[0m:\u001b[36m34 \u001b[0m | \u001b[1mSubmitting job. function:\u001b[0m\n", + "\u001b[32m 2024-Apr-18 02:14:52.33\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.components.model\u001b[0m:\u001b[36m446 \u001b[0m | \u001b[1mQuery not found in metadata, adding...\u001b[0m\n", + "\u001b[32m 2024-Apr-18 02:14:52.33\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.components.model\u001b[0m:\u001b[36m448 \u001b[0m | \u001b[1mDone\u001b[0m\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "100%|██████████████████████████████████████████████████████████████████████████████████████████████| 992/992 [00:00<00:00, 457526.89it/s]\n" + ] + }, + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "8d3204745a61400b8bad7b8446b94848", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "Batches: 0%| | 0/31 [00:00. function: future:29e8f5e8-18db-4e67-b443-8d550d080cf6\u001b[0m\n" + ] + }, + { + "data": { + "text/plain": [ + "([],\n", + " Listener(identifier='listener1', key='txt', model=SentenceTransformer(preferred_devices=('cuda', 'mps', 'cpu'), device='cpu', identifier='embedding', signature='singleton', datatype=DataType(identifier='vector[1024]', encoder=None, decoder=None, info=None, shape=(1024,), directory=None, encodable='native', bytes_encoding=), output_schema=None, flatten=False, model_update_kwargs={}, predict_kwargs={'show_progress_bar': True}, compute_kwargs={}, object=SentenceTransformer(\n", + " (0): Transformer({'max_seq_length': 512, 'do_lower_case': True}) with Transformer model: BertModel \n", + " (1): Pooling({'word_embedding_dimension': 1024, 'pooling_mode_cls_token': True, 'pooling_mode_mean_tokens': False, 'pooling_mode_max_tokens': False, 'pooling_mode_mean_sqrt_len_tokens': False, 'pooling_mode_weightedmean_tokens': False, 'pooling_mode_lasttoken': False, 'include_prompt': True})\n", + " (2): Normalize()\n", + " ), model='embedding', preprocess=None, postprocess= at 0x1a3d7efc0>), select=, active=True, predict_kwargs={'max_chunk_size': 3000}))" + ] + }, + "execution_count": 27, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "db.add(listener1)" + ] + }, + { + "cell_type": "code", + "execution_count": 28, + "id": "8b63c4b7", + "metadata": {}, + "outputs": [], + "source": [ + "from superduperdb import VectorIndex" + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "id": "b3b57380", + "metadata": {}, + "outputs": [], + "source": [ + "vi = VectorIndex(\n", + " identifier='my-index', # Unique identifier for the VectorIndex\n", + " indexing_listener=listener1, # Listener to be used for indexing documents\n", + " measure='cosine'\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 30, + "id": "b4326418", + "metadata": { + "scrolled": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u001b[32m 2024-Apr-18 02:27:43.92\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.backends.local.compute\u001b[0m:\u001b[36m34 \u001b[0m | \u001b[1mSubmitting job. function:\u001b[0m\n", + "\u001b[32m 2024-Apr-18 02:27:43.92\u001b[0m| \u001b[32m\u001b[1mSUCCESS \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.backends.local.compute\u001b[0m:\u001b[36m40 \u001b[0m | \u001b[32m\u001b[1mJob submitted on . function: future:bae6a1be-697e-437d-addd-8084463a392f\u001b[0m\n" + ] + } + ], + "source": [ + "jobs, _ = db.add(vi)" + ] + }, + { + "cell_type": "markdown", + "id": "2bbbf861", + "metadata": {}, + "source": [ + "## Inference" + ] + }, + { + "cell_type": "code", + "execution_count": 38, + "id": "c1e60e9a", + "metadata": { + "scrolled": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u001b[32m 2024-Apr-18 03:06:23.01\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.base.datalayer\u001b[0m:\u001b[36m974 \u001b[0m | \u001b[1m{}\u001b[0m\n" + ] + }, + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "b19d57e46bf040d086cff320d67fd997", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "Batches: 0%| | 0/1 [00:00" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/markdown": [ + "```\n", + "\n", + "\n", + "\n", + "Read more about the `VectorIndex` concept [here](../walkthrough/vector_search.md).\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "'https://docs.superduperdb.com/docs/docs/fundamentals/procedural_vs_declarative_api'" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/markdown": [ + "---" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/markdown": [ + "the vectors calculated by the `Listener`, and, is fitted\n", + "\n", + "based on those vectors and the label set.\n", + "\n", + "\n", + "\n", + "```python\n", + "\n", + "from sklearn.svm import SVC\n", + "\n", + "from my_models.vision import MyTorchModule, prepare_image\n", + "\n", + "\n", + "\n", + "from superduperdb.ext.numpy import array\n", + "\n", + "from superduperdb.ext.sklearn import Estimator\n", + "\n", + "from superduperdb.ext.torch import TorchModel\n", + "\n", + "from superduperdb import Stack, VectorIndex, Listener\n", + "\n", + "from superduperdb.backends.mongodb import Collection\n", + "\n", + "\n", + "\n", + "collection = Collection('images')\n", + "\n", + "\n", + "\n", + "my_listener=Listener(\n", + "\n", + " 'my-listener',\n", + "\n", + " model=TorchModel(\n", + "\n", + " 'my-cnn-vectorizer',\n", + "\n", + " object=MyTorchModule(),\n", + "\n", + " preprocess=prepare_image,\n", + "\n", + " postprocess=lambda x: x.numpy(),\n", + "\n", + " encoder=array(dtype='float', shape=(512,))\n", + "\n", + " )\n", + "\n", + " key='img',\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "'https://docs.superduperdb.com/docs/docs/walkthrough/creating_stacks_of_functionality'" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/markdown": [ + "---" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/markdown": [ + "from superduperdb.ext.torch import TorchModel\n", + "\n", + "from superduperdb import Stack, VectorIndex, Listener\n", + "\n", + "from superduperdb.backends.mongodb import Collection\n", + "\n", + "\n", + "\n", + "collection = Collection('images')\n", + "\n", + "\n", + "\n", + "my_listener=Listener(\n", + "\n", + " 'my-listener',\n", + "\n", + " model=TorchModel(\n", + "\n", + " 'my-cnn-vectorizer',\n", + "\n", + " object=MyTorchModule(),\n", + "\n", + " preprocess=prepare_image,\n", + "\n", + " postprocess=lambda x: x.numpy(),\n", + "\n", + " encoder=array(dtype='float', shape=(512,))\n", + "\n", + " )\n", + "\n", + " key='img',\n", + "\n", + " select=collection.find({'_fold': 'train'})\n", + "\n", + ")\n", + "\n", + "\n", + "\n", + "db.add(\n", + "\n", + " Stack(\n", + "\n", + " 'my-stack',\n", + "\n", + " [\n", + "\n", + " my_listener,\n", + "\n", + " VectorIndex(\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "'https://docs.superduperdb.com/docs/docs/walkthrough/creating_stacks_of_functionality'" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/markdown": [ + "---" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/markdown": [ + "\n", + "\n", + "collection = Collection('images')\n", + "\n", + "\n", + "\n", + "my_listener=Listener(\n", + "\n", + " 'my-listener',\n", + "\n", + " model=TorchModel(\n", + "\n", + " 'my-cnn-vectorizer',\n", + "\n", + " object=MyTorchModule(),\n", + "\n", + " preprocess=prepare_image,\n", + "\n", + " postprocess=lambda x: x.numpy(),\n", + "\n", + " encoder=array(dtype='float', shape=(512,))\n", + "\n", + " )\n", + "\n", + " key='img',\n", + "\n", + " select=collection.find({'_fold': 'train'})\n", + "\n", + ")\n", + "\n", + "\n", + "\n", + "db.add(\n", + "\n", + " Stack(\n", + "\n", + " 'my-stack',\n", + "\n", + " [\n", + "\n", + " my_listener,\n", + "\n", + " VectorIndex(\n", + "\n", + " 'my-index',\n", + "\n", + " indexing_listener=my_listener,\n", + "\n", + " ),\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "'https://docs.superduperdb.com/docs/docs/walkthrough/creating_stacks_of_functionality'" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/markdown": [ + "---" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/markdown": [ + "my_listener=Listener(\n", + "\n", + " 'my-listener',\n", + "\n", + " model=TorchModel(\n", + "\n", + " 'my-cnn-vectorizer',\n", + "\n", + " object=MyTorchModule(),\n", + "\n", + " preprocess=prepare_image,\n", + "\n", + " postprocess=lambda x: x.numpy(),\n", + "\n", + " encoder=array(dtype='float', shape=(512,))\n", + "\n", + " )\n", + "\n", + " key='img',\n", + "\n", + " select=collection.find({'_fold': 'train'})\n", + "\n", + ")\n", + "\n", + "\n", + "\n", + "db.add(\n", + "\n", + " Stack(\n", + "\n", + " 'my-stack',\n", + "\n", + " [\n", + "\n", + " my_listener,\n", + "\n", + " VectorIndex(\n", + "\n", + " 'my-index',\n", + "\n", + " indexing_listener=my_listener,\n", + "\n", + " ),\n", + "\n", + " Estimator(\n", + "\n", + " 'my-classifier',\n", + "\n", + " object=SVC()\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "'https://docs.superduperdb.com/docs/docs/walkthrough/creating_stacks_of_functionality'" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/markdown": [ + "---" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CPU times: user 1.33 s, sys: 273 ms, total: 1.6 s\n", + "Wall time: 360 ms\n" + ] + } + ], + "source": [ + "%%time\n", + "from superduperdb.backends.ibis import Table\n", + "from superduperdb import Document as D\n", + "from IPython.display import *\n", + "\n", + "# Define the query for the search\n", + "query = 'Code snippet how to create a `VectorIndex` with a torchvision model'\n", + "# query = 'can you explain vector-indexes with `superduperdb`?'\n", + "\n", + "# Execute a search using SuperDuperDB to find documents containing the specified query\n", + "result = db.execute(\n", + " query=table.like(D({'txt': query}), vector_index='my-index', n=5).select('id', 'txt', 'link')\n", + ")\n", + "\n", + "# Display a horizontal rule to separate results\n", + "display(Markdown('---'))\n", + "\n", + "# Display each document's 'txt' field and separate them with a horizontal rule\n", + "for r in result:\n", + " display(Markdown(r['txt']))\n", + " display(r['link'])\n", + " display(Markdown('---'))" + ] + }, + { + "cell_type": "markdown", + "id": "fa31f8be", + "metadata": {}, + "source": [ + "## Future Works\n", + "1. HNSW\n", + "2. IVFFlat " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "766edab3", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.5" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/superduperdb/backends/base/backends.py b/superduperdb/backends/base/backends.py index 9f1825a70..334a85087 100644 --- a/superduperdb/backends/base/backends.py +++ b/superduperdb/backends/base/backends.py @@ -32,7 +32,7 @@ 'lance': LanceVectorSearcher, 'in_memory': InMemoryVectorSearcher, 'mongodb+srv': MongoAtlasVectorSearcher, - 'postgres': PostgresVectorSearcher + 'pg_vector': PostgresVectorSearcher } CONNECTIONS = { diff --git a/superduperdb/backends/ibis/query.py b/superduperdb/backends/ibis/query.py index 89ac746be..8c81bbce7 100644 --- a/superduperdb/backends/ibis/query.py +++ b/superduperdb/backends/ibis/query.py @@ -3,6 +3,8 @@ import re import types import typing as t +from superduperdb import CFG +import psycopg2 import pandas @@ -49,25 +51,55 @@ def _model_update_impl( outputs: t.Sequence[t.Any], flatten: bool = False, ): - if flatten: - raise NotImplementedError('Flatten not yet supported for ibis') + if CFG.cluster.vector_search.type == 'in_memory': + if flatten: + raise NotImplementedError('Flatten not yet supported for ibis') - if not outputs: - return - - table_records = [] - for ix in range(len(outputs)): - d = { - '_input_id': str(ids[ix]), - 'output': outputs[ix], - } - table_records.append(d) + if not outputs: + return - for r in table_records: - if isinstance(r['output'], dict) and '_content' in r['output']: - r['output'] = r['output']['_content']['bytes'] + table_records = [] + for ix in range(len(outputs)): + d = { + '_input_id': str(ids[ix]), + 'output': outputs[ix], + } + table_records.append(d) + + for r in table_records: + if isinstance(r['output'], dict) and '_content' in r['output']: + r['output'] = r['output']['_content']['bytes'] + + db.databackend.insert(f'_outputs.{predict_id}', table_records) + + elif CFG.cluster.vector_search.type == 'pg_vector': + # Connect to your PostgreSQL database + conn = psycopg2.connect(CFG.cluster.vector_search.uri) + table_name = f'_outputs.{predict_id}' + with conn.cursor() as cursor: + cursor.execute('CREATE EXTENSION IF NOT EXISTS vector') + cursor.execute(f"""DROP TABLE IF EXISTS "{table_name}";""") + cursor.execute( + f"""CREATE TABLE "{table_name}" ( + _input_id VARCHAR PRIMARY KEY, + output vector(1024), + _fold VARCHAR + ); + """ + ) + for ix in range(len(outputs)): + try: + cursor.execute( + f"""INSERT INTO "{table_name}" (_input_id, output) VALUES (%s, %s);""", + [str(ids[ix]), outputs[ix]] + ) + except: + pass - db.databackend.insert(f'_outputs.{predict_id}', table_records) + # Commit the transaction + conn.commit() + # Close the connection + conn.close() class IbisBackendError(Exception): @@ -183,7 +215,7 @@ def compile(self, db: 'Datalayer', tables: t.Optional[t.Dict] = None): if tables is None: tables = {} if table_id not in tables: - tables[table_id] = db.databackend.conn.table(table_id) + tables[table_id] = db.databackend.conn.tables.get(table_id) return self.query_linker.compile(db, tables=tables) def get_all_tables(self): diff --git a/superduperdb/vector_search/interface.py b/superduperdb/vector_search/interface.py index 3b9e6c54d..91cc7fa50 100644 --- a/superduperdb/vector_search/interface.py +++ b/superduperdb/vector_search/interface.py @@ -16,15 +16,16 @@ def __init__(self, db: 'Datalayer', vector_searcher, vector_index: str): self.vector_index = vector_index if CFG.cluster.vector_search.uri is not None: - if not db.server_mode: - request_server( - service='vector_search', - endpoint='create/search', - args={ - 'vector_index': self.vector_index, - }, - type='get', - ) + if CFG.cluster.vector_search.type != 'pg_vector': + if not db.server_mode: + request_server( + service='vector_search', + endpoint='create/search', + args={ + 'vector_index': self.vector_index, + }, + type='get', + ) def __len__(self): return len(self.searcher) @@ -103,13 +104,14 @@ def find_nearest_from_array( :param n: number of nearest vectors to return """ if CFG.cluster.vector_search.uri is not None: - response = request_server( - service='vector_search', - data=h, - endpoint='query/search', - args={'vector_index': self.vector_index, 'n': n}, - ) - return response['ids'], response['scores'] + if CFG.cluster.vector_search.type != 'pg_vector': + response = request_server( + service='vector_search', + data=h, + endpoint='query/search', + args={'vector_index': self.vector_index, 'n': n}, + ) + return response['ids'], response['scores'] return self.searcher.find_nearest_from_array(h=h, n=n, within_ids=within_ids) diff --git a/superduperdb/vector_search/postgres.py b/superduperdb/vector_search/postgres.py index 4a04ceba4..85fb052da 100644 --- a/superduperdb/vector_search/postgres.py +++ b/superduperdb/vector_search/postgres.py @@ -79,7 +79,7 @@ def __init__( with self.connection.cursor() as cursor: cursor.execute('CREATE EXTENSION IF NOT EXISTS vector') cursor.execute( - 'CREATE TABLE IF NOT EXISTS "%s" (id varchar, txt VARCHAR, embedding vector(%d))' + 'CREATE TABLE IF NOT EXISTS "%s" (id varchar, txt VARCHAR, output vector(%d))' % (self.identifier, self.dimensions) ) self.connection.commit() @@ -96,11 +96,11 @@ def __len__(self): def get_measure_query(self): if self.measure.value == "l2": - return "embedding <-> '%s'" + return "output <-> '%s'" elif self.measure.value == "dot": - return "(embedding <#> '%s') * -1" + return "(output <#> '%s') * -1" elif self.measure.value == "cosine": - return "1 - (embedding <=> '%s')" + return "(output <=> '%s')" else: raise NotImplementedError("Unrecognized measure format") @@ -110,7 +110,7 @@ def _create_or_append_to_dataset(self, vectors, ids): for id_, vector in zip(ids, vectors): try: cursor.execute( - "INSERT INTO %s (id, embedding) VALUES (%s, '%s');" % (self.identifier, id_, vector) + "INSERT INTO %s (id, output) VALUES (%s, '%s');" % (self.identifier, id_, vector) ) except Exception as e: pass @@ -120,13 +120,13 @@ def _create_index(self): with self.connection.cursor() as cursor: if self.indexing.name == 'hnsw': cursor.execute("""CREATE INDEX ON %s - USING %s (embedding %s) + USING %s (output %s) WITH (m = %s, ef_construction = %s);""" % (self.identifier, self.indexing.name, self.indexing_measure, self.indexing.m, self.indexing.ef_construction)) cursor.execute("""SET %s.ef_search = %s;""" % (self.indexing.name, self.indexing.ef_search)) elif self.indexing.name == 'ivfflat': cursor.execute("""CREATE INDEX ON %s - USING %s (embedding %s) + USING %s (output %s) WITH (lists = %s);""" % (self.identifier, self.indexing.name, self.indexing_measure, self.indexing.lists)) cursor.execute("""SET %s.probes = %s;""" % (self.indexing.name, self.indexing.probes)) @@ -174,7 +174,7 @@ def find_nearest_from_id( with self.connection.cursor() as curr: curr.execute( """ - SELECT embedding + SELECT output FROM %s WHERE id = '%s'""" % (self.identifier, _id) @@ -200,15 +200,16 @@ def find_nearest_from_array( within_ids_str = ', '.join([f"'{i}'" for i in within_ids]) condition = f"id in ({within_ids_str})" query_search_nearest = f""" - SELECT id, {self.measure_query} as distance - FROM %s + SELECT _input_id, {self.measure_query} as distance + FROM "%s" WHERE %s - ORDER BY distance + ORDER BY distance ASC LIMIT %d """ + with self.connection.cursor() as curr: curr.execute( - query_search_nearest % (json.dumps(h), self.identifier, condition, n) + query_search_nearest % (list(h), self.identifier, condition, n) ) nearest_items = curr.fetchall() ids = [row[0] for row in nearest_items] @@ -240,9 +241,10 @@ def from_component(cls, vi: 'VectorIndex'): indexing_version = vi.indexing_listener.model.version output_path = f'_outputs.{vi.indexing_listener.predict_id}' + print(output_path) return PostgresVectorSearcher( - uri=CFG.data_backend, + uri=CFG.cluster.vector_search.uri, identifier=output_path, dimensions=vi.dimensions, measure=VectorIndexMeasureType.cosine, diff --git a/superduperdb/vector_search/update_tasks.py b/superduperdb/vector_search/update_tasks.py index c5f7abd3a..cb9dd71ee 100644 --- a/superduperdb/vector_search/update_tasks.py +++ b/superduperdb/vector_search/update_tasks.py @@ -6,6 +6,7 @@ from superduperdb.base.serializable import Serializable from superduperdb.misc.special_dicts import MongoStyleDict from superduperdb.vector_search.base import VectorItem +from superduperdb import CFG def delete_vectors( @@ -40,48 +41,49 @@ def copy_vectors( :param db: A ``DB`` instance. """ - vi = db.vector_indices[vector_index] - if isinstance(query, dict): - # ruff: noqa: E501 - query: CompoundSelect = Serializable.decode(query) # type: ignore[no-redef] - assert isinstance(query, CompoundSelect) - if not ids: - select = query - else: - select = query.select_using_ids(ids) - docs = db.select(select) - docs = [doc.unpack() for doc in docs] - key = vi.indexing_listener.key - if '_outputs.' in key: - key = key.split('.')[1] - # TODO: Refactor the below logic - vectors = [] - if isinstance(db.databackend, MongoDataBackend): - vectors = [ - { - 'vector': MongoStyleDict(doc)[ - f'_outputs.{vi.indexing_listener.predict_id}' - ], - 'id': str(doc['_id']), - } - for doc in docs - ] - elif isinstance(db.databackend, IbisDataBackend): - docs = db.execute(select.outputs(vi.indexing_listener.predict_id)) - from superduperdb.backends.ibis.data_backend import INPUT_KEY + if CFG.cluster.vector_search.type != 'pg_vector': + vi = db.vector_indices[vector_index] + if isinstance(query, dict): + # ruff: noqa: E501 + query: CompoundSelect = Serializable.decode(query) # type: ignore[no-redef] + assert isinstance(query, CompoundSelect) + if not ids: + select = query + else: + select = query.select_using_ids(ids) + docs = db.select(select) + docs = [doc.unpack() for doc in docs] + key = vi.indexing_listener.key + if '_outputs.' in key: + key = key.split('.')[1] + # TODO: Refactor the below logic + vectors = [] + if isinstance(db.databackend, MongoDataBackend): + vectors = [ + { + 'vector': MongoStyleDict(doc)[ + f'_outputs.{vi.indexing_listener.predict_id}' + ], + 'id': str(doc['_id']), + } + for doc in docs + ] + elif isinstance(db.databackend, IbisDataBackend): + docs = db.execute(select.outputs(vi.indexing_listener.predict_id)) + from superduperdb.backends.ibis.data_backend import INPUT_KEY - vectors = [ - { - 'vector': doc[f'_outputs.{vi.indexing_listener.predict_id}'], - 'id': str(doc[INPUT_KEY]), - } - for doc in docs - ] - for r in vectors: - if hasattr(r['vector'], 'numpy'): - r['vector'] = r['vector'].numpy() + vectors = [ + { + 'vector': doc[f'_outputs.{vi.indexing_listener.predict_id}'], + 'id': str(doc[INPUT_KEY]), + } + for doc in docs + ] + for r in vectors: + if hasattr(r['vector'], 'numpy'): + r['vector'] = r['vector'].numpy() - if vectors: - db.fast_vector_searchers[vi.identifier].add( - [VectorItem(**vector) for vector in vectors] - ) + if vectors: + db.fast_vector_searchers[vi.identifier].add( + [VectorItem(**vector) for vector in vectors] + ) From a9f9a236a66f57bf25c69c98a6506ea100a0f840 Mon Sep 17 00:00:00 2001 From: Tarun Makkar Date: Sun, 21 Apr 2024 02:01:03 +0530 Subject: [PATCH 9/9] hnsw, ivfflat --- examples/pgvector.ipynb | 2142 ++--------------------- superduperdb/backends/ibis/query.py | 2 + superduperdb/components/vector_index.py | 27 + superduperdb/vector_search/postgres.py | 11 +- 4 files changed, 154 insertions(+), 2028 deletions(-) diff --git a/examples/pgvector.ipynb b/examples/pgvector.ipynb index 8fc3b55b3..0de6762b2 100644 --- a/examples/pgvector.ipynb +++ b/examples/pgvector.ipynb @@ -2,15 +2,15 @@ "cells": [ { "cell_type": "markdown", - "id": "fc57749f", + "id": "be320b36", "metadata": {}, "source": [ - "# Building Private Q&A Assistant Using Postgres + Pgvector and Open Source Model" + "# Postgres + Pgvector (HNSW and IVFflat Indexing)" ] }, { "cell_type": "markdown", - "id": "cafefc8a", + "id": "3d6841b7", "metadata": {}, "source": [ "## Prerequisites\n", @@ -20,8 +20,8 @@ }, { "cell_type": "code", - "execution_count": 1, - "id": "7c56cd0b", + "execution_count": null, + "id": "f6d47e1a", "metadata": { "scrolled": true }, @@ -37,8 +37,8 @@ }, { "cell_type": "code", - "execution_count": 2, - "id": "9590a2b1", + "execution_count": null, + "id": "61b8392b", "metadata": {}, "outputs": [], "source": [ @@ -47,7 +47,7 @@ }, { "cell_type": "markdown", - "id": "c65ffd13", + "id": "a44fba27", "metadata": {}, "source": [ "## Connect to datastore \n", @@ -63,8 +63,8 @@ }, { "cell_type": "code", - "execution_count": 3, - "id": "cf641454", + "execution_count": null, + "id": "a4e7a535", "metadata": {}, "outputs": [], "source": [ @@ -73,24 +73,12 @@ }, { "cell_type": "code", - "execution_count": 4, - "id": "20ac07ae", + "execution_count": null, + "id": "8f9fb3c1", "metadata": { "scrolled": true }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\u001b[32m 2024-Apr-18 02:14:21.59\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.base.build\u001b[0m:\u001b[36m65 \u001b[0m | \u001b[1mData Client is ready. \u001b[0m\n", - "\u001b[32m 2024-Apr-18 02:14:21.60\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.base.build\u001b[0m:\u001b[36m30 \u001b[0m | \u001b[1mConnecting to Metadata Client: sqlite:///.superduperdb/metadata.sqlite\u001b[0m\n", - "\u001b[32m 2024-Apr-18 02:14:21.62\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.backends.local.artifacts\u001b[0m:\u001b[36m29 \u001b[0m | \u001b[1mCreating artifact store directory\u001b[0m\n", - "\u001b[32m 2024-Apr-18 02:14:21.62\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.base.build\u001b[0m:\u001b[36m160 \u001b[0m | \u001b[1mConnecting to compute client: None\u001b[0m\n", - "\u001b[32m 2024-Apr-18 02:14:21.62\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.base.datalayer\u001b[0m:\u001b[36m89 \u001b[0m | \u001b[1mBuilding Data Layer\u001b[0m\n" - ] - } - ], + "outputs": [], "source": [ "from superduperdb import superduper\n", "from superduperdb.backends.ibis import Table\n", @@ -106,329 +94,23 @@ "db = superduper(\n", " connection_uri,\n", " metadata_store='sqlite:///.superduperdb/metadata.sqlite',\n", - "# cluster__vector_search = vs,\n", ")" ] }, { "cell_type": "code", - "execution_count": 5, - "id": "87a0fdf1", + "execution_count": null, + "id": "c6f93d7d", "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "```\n", - "{\n", - " \"cfg\": {\n", - " \"data_backend\": \"mongodb://localhost:27017/test_db\",\n", - " \"lance_home\": \".superduperdb/vector_indices\",\n", - " \"artifact_store\": null,\n", - " \"metadata_store\": null,\n", - " \"cluster\": {\n", - " \"compute\": {\n", - " \"uri\": null,\n", - " \"compute_kwargs\": {}\n", - " },\n", - " \"vector_search\": {\n", - " \"uri\": \"postgresql://postgres:test@localhost:8000/qa\",\n", - " \"type\": \"pg_vector\",\n", - " \"backfill_batch_size\": 100\n", - " },\n", - " \"cdc\": {\n", - " \"uri\": null,\n", - " \"strategy\": null\n", - " }\n", - " },\n", - " \"retries\": {\n", - " \"stop_after_attempt\": 2,\n", - " \"wait_max\": 10.0,\n", - " \"wait_min\": 4.0,\n", - " \"wait_multiplier\": 1.0\n", - " },\n", - " \"downloads\": {\n", - " \"folder\": null,\n", - " \"n_workers\": 0,\n", - " \"headers\": {\n", - " \"User-Agent\": \"me\"\n", - " },\n", - " \"timeout\": null\n", - " },\n", - " \"fold_probability\": 0.05,\n", - " \"log_level\": \"INFO\",\n", - " \"logging_type\": \"SYSTEM\",\n", - " \"bytes_encoding\": \"Bytes\"\n", - " },\n", - " \"cwd\": \"/Users/tarun/Desktop/superduperDB/superduperdb/examples\",\n", - " \"freeze\": [\n", - " \"aiohttp==3.9.3\",\n", - " \"aiohttp-cors==0.7.0\",\n", - " \"aiosignal==1.3.1\",\n", - " \"annotated-types==0.6.0\",\n", - " \"anyio==4.3.0\",\n", - " \"appnope==0.1.4\",\n", - " \"argon2-cffi==23.1.0\",\n", - " \"argon2-cffi-bindings==21.2.0\",\n", - " \"arrow==1.3.0\",\n", - " \"asn1crypto==1.5.1\",\n", - " \"asttokens==2.4.1\",\n", - " \"async-lru==2.0.4\",\n", - " \"atpublic==4.0\",\n", - " \"attrs==23.2.0\",\n", - " \"Babel==2.14.0\",\n", - " \"beautifulsoup4==4.12.3\",\n", - " \"bidict==0.23.1\",\n", - " \"bleach==6.1.0\",\n", - " \"boto3==1.34.69\",\n", - " \"botocore==1.34.69\",\n", - " \"build==1.1.1\",\n", - " \"cachetools==5.3.3\",\n", - " \"certifi==2024.2.2\",\n", - " \"cffi==1.16.0\",\n", - " \"charset-normalizer==3.3.2\",\n", - " \"click==8.1.7\",\n", - " \"cloudpickle==3.0.0\",\n", - " \"colorful==0.5.6\",\n", - " \"comm==0.2.2\",\n", - " \"cryptography==42.0.5\",\n", - " \"dask==2024.3.1\",\n", - " \"debugpy==1.8.1\",\n", - " \"decorator==5.1.1\",\n", - " \"defusedxml==0.7.1\",\n", - " \"dill==0.3.8\",\n", - " \"distlib==0.3.8\",\n", - " \"distributed==2024.3.1\",\n", - " \"dnspython==2.6.1\",\n", - " \"duckdb==0.10.1\",\n", - " \"duckdb_engine==0.11.2\",\n", - " \"executing==2.0.1\",\n", - " \"fastapi==0.110.0\",\n", - " \"fastjsonschema==2.19.1\",\n", - " \"filelock==3.13.2\",\n", - " \"fqdn==1.5.1\",\n", - " \"frozenlist==1.4.1\",\n", - " \"fsspec==2024.3.1\",\n", - " \"ftfy==6.2.0\",\n", - " \"google-api-core==2.18.0\",\n", - " \"google-auth==2.29.0\",\n", - " \"googleapis-common-protos==1.63.0\",\n", - " \"greenlet==3.0.3\",\n", - " \"grpcio==1.62.1\",\n", - " \"h11==0.14.0\",\n", - " \"httpcore==1.0.4\",\n", - " \"httpx==0.27.0\",\n", - " \"huggingface-hub==0.22.0\",\n", - " \"ibis==3.3.0\",\n", - " \"ibis-framework==8.0.0\",\n", - " \"idna==3.6\",\n", - " \"importlib_metadata==7.1.0\",\n", - " \"ipykernel==6.29.3\",\n", - " \"ipython==8.22.2\",\n", - " \"ipython-genutils==0.2.0\",\n", - " \"ipywidgets==8.1.2\",\n", - " \"isoduration==20.11.0\",\n", - " \"jedi==0.19.1\",\n", - " \"Jinja2==3.1.3\",\n", - " \"jmespath==1.0.1\",\n", - " \"joblib==1.3.2\",\n", - " \"json5==0.9.24\",\n", - " \"jsonpointer==2.4\",\n", - " \"jsonschema==4.21.1\",\n", - " \"jsonschema-specifications==2023.12.1\",\n", - " \"jupyter==1.0.0\",\n", - " \"jupyter-console==6.6.3\",\n", - " \"jupyter-events==0.10.0\",\n", - " \"jupyter-lsp==2.2.4\",\n", - " \"jupyter_client==8.6.1\",\n", - " \"jupyter_core==5.7.2\",\n", - " \"jupyter_server==2.13.0\",\n", - " \"jupyter_server_terminals==0.5.3\",\n", - " \"jupyterlab==4.1.5\",\n", - " \"jupyterlab_pygments==0.3.0\",\n", - " \"jupyterlab_server==2.25.4\",\n", - " \"jupyterlab_widgets==3.0.10\",\n", - " \"locket==1.0.0\",\n", - " \"loguru==0.7.2\",\n", - " \"loki-logger-handler==0.1.3\",\n", - " \"markdown-it-py==3.0.0\",\n", - " \"MarkupSafe==2.1.5\",\n", - " \"matplotlib-inline==0.1.6\",\n", - " \"mdurl==0.1.2\",\n", - " \"mistune==3.0.2\",\n", - " \"mongomock==4.1.2\",\n", - " \"mpmath==1.3.0\",\n", - " \"msgpack==1.0.8\",\n", - " \"multidict==6.0.5\",\n", - " \"multipledispatch==1.0.0\",\n", - " \"nbclient==0.10.0\",\n", - " \"nbconvert==7.16.3\",\n", - " \"nbformat==5.10.3\",\n", - " \"nest-asyncio==1.6.0\",\n", - " \"networkx==3.2.1\",\n", - " \"notebook==6.1.5\",\n", - " \"notebook_shim==0.2.4\",\n", - " \"numpy==1.24.4\",\n", - " \"openai-clip==1.0.1\",\n", - " \"opencensus==0.11.4\",\n", - " \"opencensus-context==0.1.3\",\n", - " \"overrides==7.7.0\",\n", - " \"packaging==23.2\",\n", - " \"pandas==2.2.1\",\n", - " \"pandocfilters==1.5.1\",\n", - " \"parso==0.8.3\",\n", - " \"parsy==2.1\",\n", - " \"partd==1.4.1\",\n", - " \"pexpect==4.9.0\",\n", - " \"pgvector==0.2.5\",\n", - " \"pillow==10.2.0\",\n", - " \"pip==23.2.1\",\n", - " \"pip-tools==7.4.1\",\n", - " \"platformdirs==3.11.0\",\n", - " \"prettytable==3.10.0\",\n", - " \"prometheus_client==0.20.0\",\n", - " \"prompt-toolkit==3.0.43\",\n", - " \"proto-plus==1.23.0\",\n", - " \"protobuf==4.25.3\",\n", - " \"psutil==5.9.8\",\n", - " \"psycopg==3.1.18\",\n", - " \"psycopg-binary==3.1.18\",\n", - " \"psycopg-pool==3.2.1\",\n", - " \"psycopg2==2.9.9\",\n", - " \"psycopg2-binary==2.9.9\",\n", - " \"ptyprocess==0.7.0\",\n", - " \"pure-eval==0.2.2\",\n", - " \"py-spy==0.3.14\",\n", - " \"pyarrow==15.0.2\",\n", - " \"pyarrow-hotfix==0.6\",\n", - " \"pyasn1==0.5.1\",\n", - " \"pyasn1-modules==0.3.0\",\n", - " \"pycparser==2.21\",\n", - " \"pydantic==2.6.4\",\n", - " \"pydantic_core==2.16.3\",\n", - " \"Pygments==2.17.2\",\n", - " \"PyJWT==2.8.0\",\n", - " \"pylance==0.8.14\",\n", - " \"pymongo==4.6.2\",\n", - " \"pyOpenSSL==24.1.0\",\n", - " \"pyperclip==1.8.2\",\n", - " \"pyproject_hooks==1.0.0\",\n", - " \"python-dateutil==2.9.0.post0\",\n", - " \"python-dotenv==1.0.1\",\n", - " \"python-json-logger==2.0.7\",\n", - " \"pytz==2024.1\",\n", - " \"PyYAML==6.0.1\",\n", - " \"pyzmq==25.1.2\",\n", - " \"qtconsole==5.5.1\",\n", - " \"QtPy==2.4.1\",\n", - " \"ray==2.10.0\",\n", - " \"readerwriterlock==1.0.9\",\n", - " \"referencing==0.34.0\",\n", - " \"regex==2023.12.25\",\n", - " \"requests==2.31.0\",\n", - " \"rfc3339-validator==0.1.4\",\n", - " \"rfc3986-validator==0.1.1\",\n", - " \"rich==13.7.1\",\n", - " \"rpds-py==0.18.0\",\n", - " \"rsa==4.9\",\n", - " \"s3transfer==0.10.1\",\n", - " \"safetensors==0.4.2\",\n", - " \"scikit-learn==1.4.1.post1\",\n", - " \"scipy==1.12.0\",\n", - " \"Send2Trash==1.8.2\",\n", - " \"sentence-transformers==2.6.0\",\n", - " \"sentinels==1.0.0\",\n", - " \"setuptools==65.5.0\",\n", - " \"six==1.16.0\",\n", - " \"smart-open==7.0.3\",\n", - " \"sniffio==1.3.1\",\n", - " \"snowflake-connector-python==3.7.1\",\n", - " \"snowflake-sqlalchemy==1.5.1\",\n", - " \"sortedcontainers==2.4.0\",\n", - " \"soupsieve==2.5\",\n", - " \"SQLAlchemy==2.0.29\",\n", - " \"sqlalchemy-views==0.3.2\",\n", - " \"sqlglot==20.11.0\",\n", - " \"stack-data==0.6.3\",\n", - " \"starlette==0.36.3\",\n", - " \"-e git+https://github.com/makkarss929/superduperdb.git@009bdfa33128a19d10372ded5b47c9f0e052ca8f#egg=superduperdb\",\n", - " \"sympy==1.12\",\n", - " \"tblib==3.0.0\",\n", - " \"tenacity==8.2.3\",\n", - " \"terminado==0.18.1\",\n", - " \"threadpoolctl==3.4.0\",\n", - " \"tinycss2==1.2.1\",\n", - " \"tokenizers==0.15.2\",\n", - " \"tomlkit==0.12.4\",\n", - " \"toolz==0.12.1\",\n", - " \"torch==2.2.1\",\n", - " \"torchvision==0.17.1\",\n", - " \"tornado==6.4\",\n", - " \"tqdm==4.66.2\",\n", - " \"traitlets==5.14.2\",\n", - " \"transformers==4.39.1\",\n", - " \"typer==0.10.0\",\n", - " \"types-python-dateutil==2.9.0.20240316\",\n", - " \"typing_extensions==4.10.0\",\n", - " \"tzdata==2024.1\",\n", - " \"uri-template==1.3.0\",\n", - " \"urllib3==2.2.1\",\n", - " \"uvicorn==0.29.0\",\n", - " \"virtualenv==20.25.1\",\n", - " \"wcwidth==0.2.13\",\n", - " \"webcolors==1.13\",\n", - " \"webencodings==0.5.1\",\n", - " \"websocket-client==1.7.0\",\n", - " \"wheel==0.43.0\",\n", - " \"widgetsnbextension==4.0.10\",\n", - " \"wrapt==1.16.0\",\n", - " \"yarl==1.9.4\",\n", - " \"zict==3.0.0\",\n", - " \"zipp==3.18.1\"\n", - " ],\n", - " \"hostname\": \"Taruns-Laptop.local\",\n", - " \"os_uname\": [\n", - " \"Darwin\",\n", - " \"Taruns-Laptop.local\",\n", - " \"22.1.0\",\n", - " \"Darwin Kernel Version 22.1.0: Sun Oct 9 20:15:09 PDT 2022; root:xnu-8792.41.9~2/RELEASE_ARM64_T6000\",\n", - " \"x86_64\"\n", - " ],\n", - " \"package_versions\": {},\n", - " \"platform\": {\n", - " \"platform\": \"macOS-10.16-x86_64-i386-64bit\",\n", - " \"python_version\": \"3.11.5\"\n", - " },\n", - " \"startup_time\": \"2024-04-18 02:14:23.585596\",\n", - " \"superduper_db_root\": \"/Users/tarun/Desktop/superduperDB/superduperdb\",\n", - " \"sys\": {\n", - " \"argv\": [\n", - " \"/Users/tarun/Desktop/superduperDB/superduperdb/superduperdb/__main__.py\",\n", - " \"info\"\n", - " ],\n", - " \"path\": [\n", - " \"/Users/tarun/Desktop/superduperDB/superduperdb/examples\",\n", - " \"/Users/tarun/miniconda3/lib/python311.zip\",\n", - " \"/Users/tarun/miniconda3/lib/python3.11\",\n", - " \"/Users/tarun/miniconda3/lib/python3.11/lib-dynload\",\n", - " \"/Users/tarun/Desktop/superduperDB/superduperdb/.venv/lib/python3.11/site-packages\",\n", - " \"__editable__.superduperdb-0.1.1.finder.__path_hook__\"\n", - " ]\n", - " }\n", - "}\n", - "```\n" - ] - } - ], + "outputs": [], "source": [ "!python -m superduperdb info" ] }, { "cell_type": "code", - "execution_count": 6, - "id": "4403b27e", + "execution_count": null, + "id": "7f4c6a9d", "metadata": {}, "outputs": [], "source": [ @@ -469,20 +151,10 @@ }, { "cell_type": "code", - "execution_count": 7, - "id": "0bc95cfb", + "execution_count": null, + "id": "72785e18", "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - " % Total % Received % Xferd Average Speed Time Time Time Current\n", - " Dload Upload Total Spent Left Speed\n", - "100 737k 100 737k 0 0 332k 0 0:00:02 0:00:02 --:--:-- 333k\n" - ] - } - ], + "outputs": [], "source": [ "# Use !curl to download the 'superduperdb_docs.json' file\n", "!curl -O https://datas-public.s3.amazonaws.com/superduperdb_docs.json\n", @@ -497,30 +169,18 @@ }, { "cell_type": "code", - "execution_count": 8, - "id": "1a99d3af", + "execution_count": null, + "id": "afba211d", "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "{'txt': '# Anthropic\\n\\n\\n\\n`superduperdb` allows users to work with `anthropic` API models.\\n\\n\\n\\nRead more about this [here](/docs/docs/walkthrough/ai_models#anthropic).',\n", - " 'link': 'https://docs.superduperdb.com/docs/docs/ai_integrations/anthropic'}" - ] - }, - "execution_count": 8, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "all_chunks_and_links[0]" ] }, { "cell_type": "code", - "execution_count": 9, - "id": "1a55feba", + "execution_count": null, + "id": "1cd41263", "metadata": {}, "outputs": [], "source": [ @@ -532,7 +192,7 @@ }, { "cell_type": "markdown", - "id": "9ae3ce2b", + "id": "97113997", "metadata": {}, "source": [ "## Define Schema and Create table\n", @@ -542,21 +202,10 @@ }, { "cell_type": "code", - "execution_count": 10, - "id": "cafbb553", + "execution_count": null, + "id": "1b5a2243", "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "Schema(identifier='questiondocs-schema', fields={'id': FieldType(identifier='String'), 'txt': FieldType(identifier='String'), 'link': FieldType(identifier='String')})" - ] - }, - "execution_count": 10, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "Schema(\n", " 'questiondocs-schema',\n", @@ -566,21 +215,10 @@ }, { "cell_type": "code", - "execution_count": 11, - "id": "01157866", + "execution_count": null, + "id": "35365286", "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "([], Table(identifier='questiondocs'))" - ] - }, - "execution_count": 11, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "# \n", "# Define the 'captions' table\n", @@ -601,8 +239,8 @@ }, { "cell_type": "code", - "execution_count": 12, - "id": "19285213", + "execution_count": null, + "id": "24d34dd5", "metadata": {}, "outputs": [], "source": [ @@ -611,8 +249,8 @@ }, { "cell_type": "code", - "execution_count": 13, - "id": "893ca27f", + "execution_count": null, + "id": "fa9bbad7", "metadata": {}, "outputs": [], "source": [ @@ -621,8 +259,8 @@ }, { "cell_type": "code", - "execution_count": 14, - "id": "e40ac657", + "execution_count": null, + "id": "4ca8f699", "metadata": {}, "outputs": [], "source": [ @@ -631,8 +269,8 @@ }, { "cell_type": "code", - "execution_count": 15, - "id": "5ce19377", + "execution_count": null, + "id": "de8596cf", "metadata": {}, "outputs": [], "source": [ @@ -641,7 +279,7 @@ }, { "cell_type": "raw", - "id": "6a8890af", + "id": "b67d92cf", "metadata": {}, "source": [ "table.insert(df[['id', 'txt', 'link']])" @@ -649,19 +287,10 @@ }, { "cell_type": "code", - "execution_count": 16, - "id": "c9360f32", + "execution_count": null, + "id": "386d1595", "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\u001b[32m 2024-Apr-18 02:14:26.91\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.backends.local.compute\u001b[0m:\u001b[36m34 \u001b[0m | \u001b[1mSubmitting job. function:\u001b[0m\n", - "\u001b[32m 2024-Apr-18 02:14:27.15\u001b[0m| \u001b[32m\u001b[1mSUCCESS \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.backends.local.compute\u001b[0m:\u001b[36m40 \u001b[0m | \u001b[32m\u001b[1mJob submitted on . function: future:16073078-3d35-4e73-90ba-e7a862f6b03a\u001b[0m\n" - ] - } - ], + "outputs": [], "source": [ "insert = table.insert(\n", " [\n", @@ -680,7 +309,7 @@ }, { "cell_type": "raw", - "id": "4b531e01", + "id": "5aeb139c", "metadata": { "scrolled": true }, @@ -690,8 +319,8 @@ }, { "cell_type": "code", - "execution_count": 17, - "id": "f819881e", + "execution_count": null, + "id": "8ee273e7", "metadata": {}, "outputs": [], "source": [ @@ -700,8 +329,8 @@ }, { "cell_type": "code", - "execution_count": 18, - "id": "42b549b6", + "execution_count": null, + "id": "097b7ea6", "metadata": {}, "outputs": [], "source": [ @@ -710,29 +339,17 @@ }, { "cell_type": "code", - "execution_count": 19, - "id": "d2fcdeef", + "execution_count": null, + "id": "34ae0e4a", "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "{'txt': '# Anthropic\\n\\n\\n\\n`superduperdb` allows users to work with `anthropic` API models.\\n\\n\\n\\nRead more about this [here](/docs/docs/walkthrough/ai_models#anthropic).',\n", - " 'link': 'https://docs.superduperdb.com/docs/docs/ai_integrations/anthropic'}" - ] - }, - "execution_count": 19, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "result[0]" ] }, { "cell_type": "markdown", - "id": "8715ffd0", + "id": "6b48b0b3", "metadata": {}, "source": [ "A `Model` is a wrapper around a self-built or ecosystem model, such as `torch`, `transformers`, `openai`." @@ -740,56 +357,44 @@ }, { "cell_type": "code", - "execution_count": 20, - "id": "06f57c03", + "execution_count": null, + "id": "d03f428a", "metadata": { "scrolled": true }, - "outputs": [ - { - "data": { - "text/plain": [ - "DataType(identifier='vector[1024]', encoder=None, decoder=None, info=None, shape=(1024,), directory=None, encodable='native', bytes_encoding=)" - ] - }, - "execution_count": 20, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "from superduperdb import vector\n", "vector(shape=(1024,))" ] }, { - "cell_type": "code", - "execution_count": 21, - "id": "890e9607", + "cell_type": "markdown", + "id": "dbefeae8", "metadata": {}, - "outputs": [], "source": [ - "import sentence_transformers\n", - "from superduperdb.ext.sentence_transformers import SentenceTransformer\n", - "from superduperdb.ext.numpy import array" + "### Model" ] }, { - "cell_type": "markdown", - "id": "4ec039cb", + "cell_type": "code", + "execution_count": null, + "id": "710c80e9", "metadata": {}, + "outputs": [], "source": [ - "### Model" + "import sentence_transformers\n", + "from superduperdb.ext.sentence_transformers import SentenceTransformer\n", + "from superduperdb.ext.numpy import array" ] }, { "cell_type": "code", - "execution_count": 22, - "id": "c617f81b", + "execution_count": null, + "id": "7db75570", "metadata": {}, "outputs": [], "source": [ - "\n", "model = SentenceTransformer(\n", " identifier=\"embedding\",\n", " object=sentence_transformers.SentenceTransformer(\"BAAI/bge-large-en-v1.5\"),\n", @@ -801,34 +406,12 @@ }, { "cell_type": "code", - "execution_count": 23, - "id": "f02690d7", + "execution_count": null, + "id": "2ef3fb79", "metadata": { "scrolled": true }, - "outputs": [ - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "afeed59560bd4e6c943b9ff065a8cc28", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "Batches: 0%| | 0/1 [00:00, field='vector_index', callable=None)" - ] - }, - "execution_count": 26, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "db.vector_indices" - ] - }, - { - "cell_type": "code", - "execution_count": 27, - "id": "1ca82a99", + "execution_count": null, + "id": "874921eb", "metadata": { "scrolled": true }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\u001b[32m 2024-Apr-18 02:14:37.48\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.components.component\u001b[0m:\u001b[36m374 \u001b[0m | \u001b[1mInitializing DataType : dill\u001b[0m\n", - "\u001b[32m 2024-Apr-18 02:14:37.48\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.components.component\u001b[0m:\u001b[36m377 \u001b[0m | \u001b[1mInitialized DataType : dill successfully\u001b[0m\n", - "\u001b[32m 2024-Apr-18 02:14:50.09\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.backends.local.compute\u001b[0m:\u001b[36m34 \u001b[0m | \u001b[1mSubmitting job. function:\u001b[0m\n", - "\u001b[32m 2024-Apr-18 02:14:52.33\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.components.model\u001b[0m:\u001b[36m446 \u001b[0m | \u001b[1mQuery not found in metadata, adding...\u001b[0m\n", - "\u001b[32m 2024-Apr-18 02:14:52.33\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.components.model\u001b[0m:\u001b[36m448 \u001b[0m | \u001b[1mDone\u001b[0m\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "100%|██████████████████████████████████████████████████████████████████████████████████████████████| 992/992 [00:00<00:00, 457526.89it/s]\n" - ] - }, - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "8d3204745a61400b8bad7b8446b94848", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "Batches: 0%| | 0/31 [00:00. function: future:29e8f5e8-18db-4e67-b443-8d550d080cf6\u001b[0m\n" - ] - }, - { - "data": { - "text/plain": [ - "([],\n", - " Listener(identifier='listener1', key='txt', model=SentenceTransformer(preferred_devices=('cuda', 'mps', 'cpu'), device='cpu', identifier='embedding', signature='singleton', datatype=DataType(identifier='vector[1024]', encoder=None, decoder=None, info=None, shape=(1024,), directory=None, encodable='native', bytes_encoding=), output_schema=None, flatten=False, model_update_kwargs={}, predict_kwargs={'show_progress_bar': True}, compute_kwargs={}, object=SentenceTransformer(\n", - " (0): Transformer({'max_seq_length': 512, 'do_lower_case': True}) with Transformer model: BertModel \n", - " (1): Pooling({'word_embedding_dimension': 1024, 'pooling_mode_cls_token': True, 'pooling_mode_mean_tokens': False, 'pooling_mode_max_tokens': False, 'pooling_mode_mean_sqrt_len_tokens': False, 'pooling_mode_weightedmean_tokens': False, 'pooling_mode_lasttoken': False, 'include_prompt': True})\n", - " (2): Normalize()\n", - " ), model='embedding', preprocess=None, postprocess= at 0x1a3d7efc0>), select=, active=True, predict_kwargs={'max_chunk_size': 3000}))" - ] - }, - "execution_count": 27, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "db.add(listener1)" ] }, + { + "cell_type": "markdown", + "id": "b26a2742", + "metadata": {}, + "source": [ + "## HNSW (Hierarchical Navigable Small Worlds graph) Indexing\n", + "\n", + "1. HNSW Indexing - Multi layer graph structure\n", + "2. IVFFlat Indexing - is based on clustering\n", + "\n", + "\n", + "> Note : `indexing_measure` and `measure` both should use same similarity approaches. Otherwise it will go for sequential scanning." + ] + }, { "cell_type": "code", - "execution_count": 28, - "id": "8b63c4b7", + "execution_count": null, + "id": "24bdc026", "metadata": {}, "outputs": [], "source": [ - "from superduperdb import VectorIndex" + "from superduperdb import VectorIndex\n", + "from superduperdb.vector_search.postgres import PostgresVectorSearcher, HNSW, IVFFlat\n", + "\n", + "hnsw_indexing = HNSW(m=16, ef_construction=64, ef_search=49)\n", + "ivfflat_indexing = IVFFlat(lists=100, probes=1)" ] }, { "cell_type": "code", - "execution_count": 29, - "id": "b3b57380", + "execution_count": null, + "id": "a0056bd2", "metadata": {}, "outputs": [], "source": [ "vi = VectorIndex(\n", " identifier='my-index', # Unique identifier for the VectorIndex\n", " indexing_listener=listener1, # Listener to be used for indexing documents\n", - " measure='cosine'\n", + " measure='cosine',\n", + " indexing = hnsw_indexing,\n", + " indexing_measure = 'vector_cosine_ops'\n", ")" ] }, { "cell_type": "code", - "execution_count": 30, - "id": "b4326418", + "execution_count": null, + "id": "93636671", "metadata": { - "scrolled": true + "scrolled": false }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\u001b[32m 2024-Apr-18 02:27:43.92\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.backends.local.compute\u001b[0m:\u001b[36m34 \u001b[0m | \u001b[1mSubmitting job. function:\u001b[0m\n", - "\u001b[32m 2024-Apr-18 02:27:43.92\u001b[0m| \u001b[32m\u001b[1mSUCCESS \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.backends.local.compute\u001b[0m:\u001b[36m40 \u001b[0m | \u001b[32m\u001b[1mJob submitted on . function: future:bae6a1be-697e-437d-addd-8084463a392f\u001b[0m\n" - ] - } - ], + "outputs": [], "source": [ "jobs, _ = db.add(vi)" ] }, { "cell_type": "markdown", - "id": "2bbbf861", + "id": "ea3293b8", "metadata": {}, "source": [ "## Inference" @@ -2024,415 +526,12 @@ }, { "cell_type": "code", - "execution_count": 38, - "id": "c1e60e9a", + "execution_count": null, + "id": "b1365988", "metadata": { "scrolled": false }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\u001b[32m 2024-Apr-18 03:06:23.01\u001b[0m| \u001b[1mINFO \u001b[0m | \u001b[36mTaruns-Laptop.local\u001b[0m| \u001b[36msuperduperdb.base.datalayer\u001b[0m:\u001b[36m974 \u001b[0m | \u001b[1m{}\u001b[0m\n" - ] - }, - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "b19d57e46bf040d086cff320d67fd997", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "Batches: 0%| | 0/1 [00:00" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "data": { - "text/markdown": [ - "```\n", - "\n", - "\n", - "\n", - "Read more about the `VectorIndex` concept [here](../walkthrough/vector_search.md).\n" - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "data": { - "text/plain": [ - "'https://docs.superduperdb.com/docs/docs/fundamentals/procedural_vs_declarative_api'" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "data": { - "text/markdown": [ - "---" - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "data": { - "text/markdown": [ - "the vectors calculated by the `Listener`, and, is fitted\n", - "\n", - "based on those vectors and the label set.\n", - "\n", - "\n", - "\n", - "```python\n", - "\n", - "from sklearn.svm import SVC\n", - "\n", - "from my_models.vision import MyTorchModule, prepare_image\n", - "\n", - "\n", - "\n", - "from superduperdb.ext.numpy import array\n", - "\n", - "from superduperdb.ext.sklearn import Estimator\n", - "\n", - "from superduperdb.ext.torch import TorchModel\n", - "\n", - "from superduperdb import Stack, VectorIndex, Listener\n", - "\n", - "from superduperdb.backends.mongodb import Collection\n", - "\n", - "\n", - "\n", - "collection = Collection('images')\n", - "\n", - "\n", - "\n", - "my_listener=Listener(\n", - "\n", - " 'my-listener',\n", - "\n", - " model=TorchModel(\n", - "\n", - " 'my-cnn-vectorizer',\n", - "\n", - " object=MyTorchModule(),\n", - "\n", - " preprocess=prepare_image,\n", - "\n", - " postprocess=lambda x: x.numpy(),\n", - "\n", - " encoder=array(dtype='float', shape=(512,))\n", - "\n", - " )\n", - "\n", - " key='img',\n" - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "data": { - "text/plain": [ - "'https://docs.superduperdb.com/docs/docs/walkthrough/creating_stacks_of_functionality'" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "data": { - "text/markdown": [ - "---" - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "data": { - "text/markdown": [ - "from superduperdb.ext.torch import TorchModel\n", - "\n", - "from superduperdb import Stack, VectorIndex, Listener\n", - "\n", - "from superduperdb.backends.mongodb import Collection\n", - "\n", - "\n", - "\n", - "collection = Collection('images')\n", - "\n", - "\n", - "\n", - "my_listener=Listener(\n", - "\n", - " 'my-listener',\n", - "\n", - " model=TorchModel(\n", - "\n", - " 'my-cnn-vectorizer',\n", - "\n", - " object=MyTorchModule(),\n", - "\n", - " preprocess=prepare_image,\n", - "\n", - " postprocess=lambda x: x.numpy(),\n", - "\n", - " encoder=array(dtype='float', shape=(512,))\n", - "\n", - " )\n", - "\n", - " key='img',\n", - "\n", - " select=collection.find({'_fold': 'train'})\n", - "\n", - ")\n", - "\n", - "\n", - "\n", - "db.add(\n", - "\n", - " Stack(\n", - "\n", - " 'my-stack',\n", - "\n", - " [\n", - "\n", - " my_listener,\n", - "\n", - " VectorIndex(\n" - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "data": { - "text/plain": [ - "'https://docs.superduperdb.com/docs/docs/walkthrough/creating_stacks_of_functionality'" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "data": { - "text/markdown": [ - "---" - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "data": { - "text/markdown": [ - "\n", - "\n", - "collection = Collection('images')\n", - "\n", - "\n", - "\n", - "my_listener=Listener(\n", - "\n", - " 'my-listener',\n", - "\n", - " model=TorchModel(\n", - "\n", - " 'my-cnn-vectorizer',\n", - "\n", - " object=MyTorchModule(),\n", - "\n", - " preprocess=prepare_image,\n", - "\n", - " postprocess=lambda x: x.numpy(),\n", - "\n", - " encoder=array(dtype='float', shape=(512,))\n", - "\n", - " )\n", - "\n", - " key='img',\n", - "\n", - " select=collection.find({'_fold': 'train'})\n", - "\n", - ")\n", - "\n", - "\n", - "\n", - "db.add(\n", - "\n", - " Stack(\n", - "\n", - " 'my-stack',\n", - "\n", - " [\n", - "\n", - " my_listener,\n", - "\n", - " VectorIndex(\n", - "\n", - " 'my-index',\n", - "\n", - " indexing_listener=my_listener,\n", - "\n", - " ),\n" - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "data": { - "text/plain": [ - "'https://docs.superduperdb.com/docs/docs/walkthrough/creating_stacks_of_functionality'" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "data": { - "text/markdown": [ - "---" - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "data": { - "text/markdown": [ - "my_listener=Listener(\n", - "\n", - " 'my-listener',\n", - "\n", - " model=TorchModel(\n", - "\n", - " 'my-cnn-vectorizer',\n", - "\n", - " object=MyTorchModule(),\n", - "\n", - " preprocess=prepare_image,\n", - "\n", - " postprocess=lambda x: x.numpy(),\n", - "\n", - " encoder=array(dtype='float', shape=(512,))\n", - "\n", - " )\n", - "\n", - " key='img',\n", - "\n", - " select=collection.find({'_fold': 'train'})\n", - "\n", - ")\n", - "\n", - "\n", - "\n", - "db.add(\n", - "\n", - " Stack(\n", - "\n", - " 'my-stack',\n", - "\n", - " [\n", - "\n", - " my_listener,\n", - "\n", - " VectorIndex(\n", - "\n", - " 'my-index',\n", - "\n", - " indexing_listener=my_listener,\n", - "\n", - " ),\n", - "\n", - " Estimator(\n", - "\n", - " 'my-classifier',\n", - "\n", - " object=SVC()\n" - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "data": { - "text/plain": [ - "'https://docs.superduperdb.com/docs/docs/walkthrough/creating_stacks_of_functionality'" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "data": { - "text/markdown": [ - "---" - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "CPU times: user 1.33 s, sys: 273 ms, total: 1.6 s\n", - "Wall time: 360 ms\n" - ] - } - ], + "outputs": [], "source": [ "%%time\n", "from superduperdb.backends.ibis import Table\n", @@ -2460,21 +559,12 @@ }, { "cell_type": "markdown", - "id": "fa31f8be", + "id": "f6a3c179", "metadata": {}, "source": [ "## Future Works\n", - "1. HNSW\n", - "2. IVFFlat " + "1. `Ibis` doesn't support `pgvector`. and want to make it supportable for that `pgvector`." ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "766edab3", - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { diff --git a/superduperdb/backends/ibis/query.py b/superduperdb/backends/ibis/query.py index 8c81bbce7..a9c4c0ac0 100644 --- a/superduperdb/backends/ibis/query.py +++ b/superduperdb/backends/ibis/query.py @@ -4,6 +4,7 @@ import types import typing as t from superduperdb import CFG +from pgvector.psycopg2 import register_vector import psycopg2 import pandas @@ -75,6 +76,7 @@ def _model_update_impl( elif CFG.cluster.vector_search.type == 'pg_vector': # Connect to your PostgreSQL database conn = psycopg2.connect(CFG.cluster.vector_search.uri) + register_vector(conn) table_name = f'_outputs.{predict_id}' with conn.cursor() as cursor: cursor.execute('CREATE EXTENSION IF NOT EXISTS vector') diff --git a/superduperdb/components/vector_index.py b/superduperdb/components/vector_index.py index 9552a14ff..b87c82c08 100644 --- a/superduperdb/components/vector_index.py +++ b/superduperdb/components/vector_index.py @@ -4,6 +4,10 @@ import numpy as np from overrides import override +from superduperdb import CFG +import psycopg2 +from pgvector.psycopg2 import register_vector + from superduperdb.base.datalayer import Datalayer from superduperdb.base.document import Document from superduperdb.components.component import Component @@ -16,6 +20,7 @@ from superduperdb.misc.special_dicts import MongoStyleDict from superduperdb.vector_search.base import VectorIndexMeasureType from superduperdb.vector_search.update_tasks import copy_vectors +from superduperdb.vector_search.postgres import PostgresIndexing, HNSW, IVFFlat KeyType = t.Union[str, t.List, t.Dict] if t.TYPE_CHECKING: @@ -42,6 +47,8 @@ class VectorIndex(Component): compatible_listener: t.Optional[Listener] = None measure: VectorIndexMeasureType = VectorIndexMeasureType.cosine metric_values: t.Optional[t.Dict] = dc.field(default_factory=dict) + indexing : t.Optional[HNSW | IVFFlat] = None, + indexing_measure : t.Optional[PostgresIndexing] = PostgresIndexing.cosine @override def on_load(self, db: Datalayer) -> None: @@ -54,6 +61,26 @@ def on_load(self, db: Datalayer) -> None: self.compatible_listener = t.cast( Listener, db.load('listener', self.compatible_listener) ) + if CFG.cluster.vector_search.type == "pg_vector": + conn = psycopg2.connect(CFG.cluster.vector_search.uri) + table_name = f"_outputs.{self.indexing_listener.predict_id}" + with conn.cursor() as cursor: + if self.indexing.name == 'hnsw': + + cursor.execute(f"""CREATE INDEX ON "{table_name}" + USING {self.indexing.name} (output {self.indexing_measure}) + WITH (m = {self.indexing.m}, ef_construction = {self.indexing.ef_construction});""") + + cursor.execute("""SET %s.ef_search = %s;""" % (self.indexing.name, self.indexing.ef_search)) + elif self.indexing.name == 'ivfflat': + cursor.execute(f"""CREATE INDEX ON "{table_name}" + USING %s (output %s) + WITH (lists = %s);""" % (self.indexing.name, self.indexing_measure, self.indexing.lists)) + + cursor.execute("""SET %s.probes = %s;""" % (self.indexing.name, self.indexing.probes)) + conn.commit() + conn.close() + def get_vector( self, diff --git a/superduperdb/vector_search/postgres.py b/superduperdb/vector_search/postgres.py index 85fb052da..f090955c3 100644 --- a/superduperdb/vector_search/postgres.py +++ b/superduperdb/vector_search/postgres.py @@ -1,8 +1,11 @@ import json import typing as t import numpy +import dataclasses as dc +from pgvector.psycopg2 import register_vector import psycopg2 + from superduperdb import CFG, logging if t.TYPE_CHECKING: from superduperdb.components.vector_index import VectorIndex @@ -12,12 +15,13 @@ from superduperdb.vector_search.base import BaseVectorSearcher, VectorItem, VectorIndexMeasureType +@dc.dataclass(kw_only=True) class PostgresIndexing: cosine = "vector_cosine_ops" l2 = "vector_l2_ops" inner_product = "vector_ip_ops" - +@dc.dataclass(kw_only=True) class IVFFlat(PostgresIndexing): """ An IVFFlat index divides vectors into lists, and then searches a subset of those lists that are closest to the query vector. @@ -31,6 +35,7 @@ def __init__(self, lists: t.Optional[int] = 100, probes: t.Optional[int] = 1): self.lists = lists self.probes = probes +@dc.dataclass(kw_only=True) class HNSW(PostgresIndexing): """ An HNSW index creates a multilayer graph. It has better query performance than IVFFlat (in terms of speed-recall tradeoff), @@ -117,8 +122,10 @@ def _create_or_append_to_dataset(self, vectors, ids): self.connection.commit() def _create_index(self): + print("_create_index") with self.connection.cursor() as cursor: if self.indexing.name == 'hnsw': + print("hnsw") cursor.execute("""CREATE INDEX ON %s USING %s (output %s) WITH (m = %s, ef_construction = %s);""" % (self.identifier, self.indexing.name, self.indexing_measure, self.indexing.m, self.indexing.ef_construction)) @@ -130,7 +137,7 @@ def _create_index(self): WITH (lists = %s);""" % (self.identifier, self.indexing.name, self.indexing_measure, self.indexing.lists)) cursor.execute("""SET %s.probes = %s;""" % (self.indexing.name, self.indexing.probes)) - + print("_create_index") self.connection.commit()