-
Notifications
You must be signed in to change notification settings - Fork 472
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Postgres : pgvector implemenation #1926
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here are some advice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good start. What about indexing? It would be very useful if we could use that to speed up the calculations: https://github.com/pgvector/pgvector?tab=readme-ov-file#indexing.
@jieguangzhou @blythed @kartik4949 I have made all requested changes, please review my PR. thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @makkarss929
Some suggestions for modifications, mainly we don’t need to modify so much, our architecture can handle these logics.
Please help to clean the outputs of the notebook
superduperdb/backends/ibis/query.py
Outdated
if CFG.cluster.vector_search.type == 'in_memory': | ||
if flatten: | ||
raise NotImplementedError('Flatten not yet supported for ibis') | ||
|
||
if not outputs: | ||
return | ||
|
||
table_records = [] | ||
for ix in range(len(outputs)): | ||
d = { | ||
'_input_id': str(ids[ix]), | ||
'output': outputs[ix], | ||
} | ||
table_records.append(d) | ||
if not outputs: | ||
return | ||
|
||
for r in table_records: | ||
if isinstance(r['output'], dict) and '_content' in r['output']: | ||
r['output'] = r['output']['_content']['bytes'] | ||
table_records = [] | ||
for ix in range(len(outputs)): | ||
d = { | ||
'_input_id': str(ids[ix]), | ||
'output': outputs[ix], | ||
} | ||
table_records.append(d) | ||
|
||
for r in table_records: | ||
if isinstance(r['output'], dict) and '_content' in r['output']: | ||
r['output'] = r['output']['_content']['bytes'] | ||
|
||
db.databackend.insert(f'_outputs.{predict_id}', table_records) | ||
|
||
elif CFG.cluster.vector_search.type == 'pg_vector': | ||
# Connect to your PostgreSQL database | ||
conn = psycopg2.connect(CFG.cluster.vector_search.uri) | ||
table_name = f'_outputs.{predict_id}' | ||
with conn.cursor() as cursor: | ||
cursor.execute('CREATE EXTENSION IF NOT EXISTS vector') | ||
cursor.execute(f"""DROP TABLE IF EXISTS "{table_name}";""") | ||
cursor.execute( | ||
f"""CREATE TABLE "{table_name}" ( | ||
_input_id VARCHAR PRIMARY KEY, | ||
output vector(1024), | ||
_fold VARCHAR | ||
); | ||
""" | ||
) | ||
for ix in range(len(outputs)): | ||
try: | ||
cursor.execute( | ||
f"""INSERT INTO "{table_name}" (_input_id, output) VALUES (%s, %s);""", | ||
[str(ids[ix]), outputs[ix]] | ||
) | ||
except: | ||
pass | ||
|
||
db.databackend.insert(f'_outputs.{predict_id}', table_records) | ||
# Commit the transaction | ||
conn.commit() | ||
# Close the connection | ||
conn.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don’t need to insert them into the table here. When vector search is really launched, the add method of PostgresVectorSearcher will be called, and at this time, the corresponding vector can be accepted and added to the table. It is necessary to manage duplicate items well as previously commented.
The results of the model are saved separately from vector search service
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jieguangzhou Don't worry about duplication, we have _input_id
as a PRIMARY KEY while creating vector table
Example _outputs.listener1::0
. I have checked there is no duplication.
elif uri.startswith('postgres://') or uri.startswith("postgresql://"): | ||
name = uri.split('//')[0] | ||
if type == 'data_backend': | ||
ibis_conn = ibis.connect(uri) | ||
return mapping['ibis'](ibis_conn, name) | ||
else: | ||
assert type == 'metadata' | ||
from sqlalchemy import create_engine | ||
|
||
sql_conn = create_engine(uri) | ||
return mapping['sqlalchemy'](sql_conn, name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this, we can directly load the URI with ibis
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove this
elif item.startswith('postgres://') or item.startswith('postgresql://'): | ||
kwargs['data_backend'] = item |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this, the same reason
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will remove this
if CFG.cluster.vector_search.type != 'pg_vector': | ||
if not db.server_mode: | ||
request_server( | ||
service='vector_search', | ||
endpoint='create/search', | ||
args={ | ||
'vector_index': self.vector_index, | ||
}, | ||
type='get', | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to change this, this is for an independent vector search service, in which the pg vector will be used logically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jieguangzhou It's throwing error that's why, I did this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What error? Could you please put the error message here?
if CFG.cluster.vector_search.type != 'pg_vector': | ||
response = request_server( | ||
service='vector_search', | ||
data=h, | ||
endpoint='query/search', | ||
args={'vector_index': self.vector_index, 'n': n}, | ||
) | ||
return response['ids'], response['scores'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jieguangzhou It's throwing error that's why, I did this
if CFG.cluster.vector_search.type != 'pg_vector': | ||
vi = db.vector_indices[vector_index] | ||
if isinstance(query, dict): | ||
# ruff: noqa: E501 | ||
query: CompoundSelect = Serializable.decode(query) # type: ignore[no-redef] | ||
assert isinstance(query, CompoundSelect) | ||
if not ids: | ||
select = query | ||
else: | ||
select = query.select_using_ids(ids) | ||
docs = db.select(select) | ||
docs = [doc.unpack() for doc in docs] | ||
key = vi.indexing_listener.key | ||
if '_outputs.' in key: | ||
key = key.split('.')[1] | ||
# TODO: Refactor the below logic | ||
vectors = [] | ||
if isinstance(db.databackend, MongoDataBackend): | ||
vectors = [ | ||
{ | ||
'vector': MongoStyleDict(doc)[ | ||
f'_outputs.{vi.indexing_listener.predict_id}' | ||
], | ||
'id': str(doc['_id']), | ||
} | ||
for doc in docs | ||
] | ||
elif isinstance(db.databackend, IbisDataBackend): | ||
docs = db.execute(select.outputs(vi.indexing_listener.predict_id)) | ||
from superduperdb.backends.ibis.data_backend import INPUT_KEY | ||
|
||
vectors = [ | ||
{ | ||
'vector': doc[f'_outputs.{vi.indexing_listener.predict_id}'], | ||
'id': str(doc[INPUT_KEY]), | ||
} | ||
for doc in docs | ||
] | ||
for r in vectors: | ||
if hasattr(r['vector'], 'numpy'): | ||
r['vector'] = r['vector'].numpy() | ||
vectors = [ | ||
{ | ||
'vector': doc[f'_outputs.{vi.indexing_listener.predict_id}'], | ||
'id': str(doc[INPUT_KEY]), | ||
} | ||
for doc in docs | ||
] | ||
for r in vectors: | ||
if hasattr(r['vector'], 'numpy'): | ||
r['vector'] = r['vector'].numpy() | ||
|
||
if vectors: | ||
db.fast_vector_searchers[vi.identifier].add( | ||
[VectorItem(**vector) for vector in vectors] | ||
) | ||
if vectors: | ||
db.fast_vector_searchers[vi.identifier].add( | ||
[VectorItem(**vector) for vector in vectors] | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jieguangzhou Remember we discussed in meeting Ibis
is not compatible with pgvector
whereas psycopg2
supports. It's throwing error when Ibis
is trying to access table with vector embeddings created with psycopg2
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have two tables when using the pg vector
search
- The model output table(
table1
), all the embedding vectors results are saved here - the pg vector search table(
table2
), save the vectors, and build an index here
The workflow for building a vector search index is as follows:
- Query the vectors from
table1
- Use these vectors to build the vector search(
the add method of PostgresVectorSearcher
)
As you can see here, we get the vectors
and call the add db.fast_vector_searchers[vi.identifier].add
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, you want me to create 2 tables
outputs.listener1::0
which is created byibis
.outputs.listener1::0_pgvector
which will be created bypsycopg2
. with theadd method of PostgresVectorSearcher
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, just one table, the pgvector table created by psycopg2
The first one created by ibis data_backend automatically
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jieguangzhou why do we need 2 tables. Why can't we just use the original listener table?
My thought was that the table will be created as a pg_vector table when the listener is created
somehow? Copying data into another table sounds wasteful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be viewed in conjunction with comment #1926 (comment) ; here are my thoughts:
- The saving behavior of model outputs should not be limited by downstream applications (such as vector search). If we need to compatible vector search separately, it would increase the complexity of saving query results. This should be a very pure interface. If vector search is to be built, then an index should be built on this table on the vector search side (if an index cannot be built, a separate table needs to be created).
- The vector search component should be independent. Ideally, we should be able to switch the underlying engine of my vector search at any time. If we couple vector search with saving, then expandability will decrease. If my vector search uses two different Postgres databases from databackend (one for data saving and one for vector search calculations), it cannot be supported.
- Ideally, if the PostgresVectorSearcher can determine that the uri being used is the same as the databackend's uri, it should first attempt to build an index in the model output table. If building an index is not possible, then a separate table needs to be created.
WDYT? @blythed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that developers could potentially use pg_vector as an external vector-store. However, most likely
users will be postgres users.
By hosting the vector-search with pg_vector, we avoid the problem of mapping the data to a new table/ database. This is exactly like MongoDB Atlas vector-search.
In the configuration with have CFG.cluster.vector_search.type = 'native'
for such cases. If that is set, then the "copy across" the vectors jobs are skipped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just conducted a basic test with pgvector:
- If a vector table is created using pgvector, ibis currently cannot directly read the data inside, as it throws an error for not being able to parse the vector type data of pgvector. Adaptations are necessary; otherwise, our data backend will no longer be able to access this table. I suspect that it would require activating the vector feature specifically for the ibis backend’s connection, which could be quite troublesome.
- To create and access vector type data in pgvector, each connection needs to activate the vector feature to enable data interchange.
It is suggested to split it into two tables, as ibis does not seem to be well compatible with pgvector at the moment.
If merging into one table is considered, the following tests need to be conducted: - Test with two types of SQL databases (Postgres and SQLite) to ensure that non-Postgres vector implementations are compatible and do not affect the original SQL functionalities. Since SQLite already has unit tests, only Postgres needs to be tested.
- Conduct application tests in cases where the vector search type is specified as pgvector, including vector searches and non-vector searches, to ensure that non-vector search functions are preserved normally. These tests should be integration tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -54,6 +61,26 @@ def on_load(self, db: Datalayer) -> None: | |||
self.compatible_listener = t.cast( | |||
Listener, db.load('listener', self.compatible_listener) | |||
) | |||
if CFG.cluster.vector_search.type == "pg_vector": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic should be isolated in the PgVector class (vector_searcher).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@blythed you want me to move this logic to PosgresVectorSearcher
class, and then call it from here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic should be isolated in the PgVector class (vector_searcher).
@blythed you want me to move this logic to PosgresVectorSearcher
class, and then call it from here. right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@blythed working on it
elif isinstance(db.databackend, IbisDataBackend): | ||
docs = db.execute(select.outputs(vi.indexing_listener.predict_id)) | ||
from superduperdb.backends.ibis.data_backend import INPUT_KEY | ||
if CFG.cluster.vector_search.type != 'pg_vector': |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because of same error Ibis
doesn't support pgvector
and vector
datatype
b8748a3
to
04010cf
Compare
@makkarss929 the ibis project has fixed the data types for |
…entation
Description
PostgresDataBackend
fromIbis.DataBackend
PostgresVectorSearcher
class inspired byMongoAtlasVectorSearcher
.Next steps :
Related Issues
Checklist
make unit-testing
andmake integration-testing
successfully?Additional Notes or Comments