From cdec644f840fb773d3244eb7a9cb3cbfaa72b2d1 Mon Sep 17 00:00:00 2001 From: Michael Minichino <36450266+mminichino@users.noreply.github.com> Date: Wed, 15 Jan 2025 18:09:17 -0600 Subject: [PATCH] feat(ingest): Couchbase source fixed retry logic --- .../source/couchbase/couchbase_connect.py | 16 ++++++++++++++-- .../datahub/ingestion/source/couchbase/retry.py | 6 ++---- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_connect.py b/metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_connect.py index 08e3165207cf1a..485b9b83e07210 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_connect.py +++ b/metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_connect.py @@ -9,6 +9,7 @@ from couchbase.auth import PasswordAuthenticator from couchbase.bucket import Bucket from couchbase.cluster import Cluster +from couchbase.diagnostics import ServiceType from couchbase.exceptions import BucketNotFoundException, InternalServerFailureException from couchbase.management.buckets import BucketManager, BucketSettings from couchbase.management.collections import CollectionManager, ScopeSpec @@ -17,6 +18,7 @@ ClusterTimeoutOptions, LockMode, TLSVerifyMode, + WaitUntilReadyOptions, ) from datahub.ingestion.source.couchbase.retry import retry @@ -68,7 +70,12 @@ def __init__( @retry() def connect(self) -> Cluster: cluster = Cluster.connect(self.cb_connect_string, self.cluster_options) - cluster.wait_until_ready(timedelta(seconds=10)) + cluster.wait_until_ready( + timedelta(seconds=10), + WaitUntilReadyOptions( + service_types=[ServiceType.KeyValue, ServiceType.Query] + ), + ) return cluster @retry(always_raise_list=(BucketNotFoundException,)) @@ -84,7 +91,12 @@ async def connect_async(self) -> AsyncCluster: self.cb_connect_string, self.cluster_options ) await cluster.on_connect() - await cluster.wait_until_ready(timedelta(seconds=10)) + await cluster.wait_until_ready( + timedelta(seconds=10), + WaitUntilReadyOptions( + service_types=[ServiceType.KeyValue, ServiceType.Query] + ), + ) return cluster @retry(always_raise_list=(BucketNotFoundException,)) diff --git a/metadata-ingestion/src/datahub/ingestion/source/couchbase/retry.py b/metadata-ingestion/src/datahub/ingestion/source/couchbase/retry.py index 619e62e155d6a8..6f3548dfebc348 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/couchbase/retry.py +++ b/metadata-ingestion/src/datahub/ingestion/source/couchbase/retry.py @@ -5,8 +5,6 @@ from functools import wraps from typing import Callable, Tuple, Type, Union -from couchbase.exceptions import CouchbaseException - logger = logging.getLogger(__name__) @@ -38,7 +36,7 @@ def f_wrapper(*args, **kwargs): for retry_number in range(retry_count + 1): try: return func(*args, **kwargs) - except CouchbaseException as err: + except Exception as err: if always_raise_list and isinstance(err, always_raise_list): raise @@ -66,7 +64,7 @@ async def f_wrapper(*args, **kwargs): for retry_number in range(retry_count + 1): try: return await func(*args, **kwargs) - except CouchbaseException as err: + except Exception as err: if always_raise_list and isinstance(err, always_raise_list): raise