From eade7dd251e732dfb517bfcbe83746d7ca295f9a Mon Sep 17 00:00:00 2001 From: Jennifer Hamon Date: Wed, 13 Nov 2024 09:14:06 -0500 Subject: [PATCH] Expose connection_pool_maxsize on Index and add docstrings (#415) ## Problem To explore the impact on performance, I want to expose a configuration kwarg for `connection_pool_maxsize` on `Index`. ## Solution This `connection_pool_maxsize` value is passed in to `urllib3.PoolManager` as `maxsize`. This param controls how many connections are cached for a given host. If we are using a large number of threads to increase parallelism but this maxsize value is relatively small, we can end up taking unnecessary overhead to establish and discard connections beyond the maxsize that are being cached. By default `connection_pool_maxsize` is set to `multiprocessing.cpu_count() * 5`. In Google colab, cpu count is only 2 so this is fairly limiting. ### Usage ```python from pinecone import Pinecone pc = Pinecone(api_key='key') index = pc.Index( host="jen1024-dojoi3u.svc.apw5-4e34-81fa.pinecone.io", pool_threads=25, connection_pool_maxsize=25 ) ``` ## Type of Change - [x] New feature (non-breaking change which adds functionality) ## Test Plan I ran some local performance tests and saw this does have an impact to performance. --- pinecone/control/pinecone.py | 8 +++++++ pinecone/data/index.py | 45 ++++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/pinecone/control/pinecone.py b/pinecone/control/pinecone.py index 7e245ec3..cd49d87f 100644 --- a/pinecone/control/pinecone.py +++ b/pinecone/control/pinecone.py @@ -765,6 +765,14 @@ def Index(self, name: str = "", host: str = "", **kwargs): # Now you're ready to perform data operations index.query(vector=[...], top_k=10) ``` + + Arguments: + name: The name of the index to target. If you specify the name of the index, the client will + fetch the host url from the Pinecone control plane. + host: The host url of the index to target. If you specify the host url, the client will use + the host url directly without making any additional calls to the control plane. + pool_threads: The number of threads to use when making parallel requests by calling index methods with optional kwarg async_req=True, or using methods that make use of parallelism automatically such as query_namespaces(). Default: 1 + connection_pool_maxsize: The maximum number of connections to keep in the connection pool. Default: 5 * multiprocessing.cpu_count() """ if name == "" and host == "": raise ValueError("Either name or host must be specified") diff --git a/pinecone/data/index.py b/pinecone/data/index.py index f2c4c9f9..52628511 100644 --- a/pinecone/data/index.py +++ b/pinecone/data/index.py @@ -105,6 +105,9 @@ def __init__( self._openapi_config = ConfigBuilder.build_openapi_config(self.config, openapi_config) self._pool_threads = pool_threads + if kwargs.get("connection_pool_maxsize", None): + self._openapi_config.connection_pool_maxsize = kwargs.get("connection_pool_maxsize") + self._vector_api = setup_openapi_client( api_client_klass=ApiClient, api_klass=DataPlaneApi, @@ -512,6 +515,48 @@ def query_namespaces( ] = None, **kwargs, ) -> QueryNamespacesResults: + """The query_namespaces() method is used to make a query to multiple namespaces in parallel and combine the results into one result set. + + Since several asynchronous calls are made on your behalf when calling this method, you will need to tune the pool_threads and connection_pool_maxsize parameter of the Index constructor to suite your workload. + + Examples: + + ```python + from pinecone import Pinecone + + pc = Pinecone(api_key="your-api-key") + index = pc.Index( + host="index-name", + pool_threads=32, + connection_pool_maxsize=32 + ) + + query_vec = [0.1, 0.2, 0.3] # An embedding that matches the index dimension + combined_results = index.query_namespaces( + vector=query_vec, + namespaces=['ns1', 'ns2', 'ns3', 'ns4'], + top_k=10, + filter={'genre': {"$eq": "drama"}}, + include_values=True, + include_metadata=True + ) + for vec in combined_results.matches: + print(vec.id, vec.score) + print(combined_results.usage) + ``` + + Args: + vector (List[float]): The query vector, must be the same length as the dimension of the index being queried. + namespaces (List[str]): The list of namespaces to query. + top_k (Optional[int], optional): The number of results you would like to request from each namespace. Defaults to 10. + filter (Optional[Dict[str, Union[str, float, int, bool, List, dict]]], optional): Pass an optional filter to filter results based on metadata. Defaults to None. + include_values (Optional[bool], optional): Boolean field indicating whether vector values should be included with results. Defaults to None. + include_metadata (Optional[bool], optional): Boolean field indicating whether vector metadata should be included with results. Defaults to None. + sparse_vector (Optional[ Union[SparseValues, Dict[str, Union[List[float], List[int]]]] ], optional): If you are working with a dotproduct index, you can pass a sparse vector as part of your hybrid search. Defaults to None. + + Returns: + QueryNamespacesResults: A QueryNamespacesResults object containing the combined results from all namespaces, as well as the combined usage cost in read units. + """ if namespaces is None or len(namespaces) == 0: raise ValueError("At least one namespace must be specified") if len(vector) == 0: