Skip to content

Commit

Permalink
feat(ingest): Couchbase source fixed retry logic
Browse files Browse the repository at this point in the history
  • Loading branch information
mminichino committed Jan 16, 2025
1 parent 9af84b3 commit cdec644
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from couchbase.auth import PasswordAuthenticator
from couchbase.bucket import Bucket
from couchbase.cluster import Cluster
from couchbase.diagnostics import ServiceType
from couchbase.exceptions import BucketNotFoundException, InternalServerFailureException
from couchbase.management.buckets import BucketManager, BucketSettings
from couchbase.management.collections import CollectionManager, ScopeSpec
Expand All @@ -17,6 +18,7 @@
ClusterTimeoutOptions,
LockMode,
TLSVerifyMode,
WaitUntilReadyOptions,
)

from datahub.ingestion.source.couchbase.retry import retry
Expand Down Expand Up @@ -68,7 +70,12 @@ def __init__(
@retry()
def connect(self) -> Cluster:
cluster = Cluster.connect(self.cb_connect_string, self.cluster_options)
cluster.wait_until_ready(timedelta(seconds=10))
cluster.wait_until_ready(
timedelta(seconds=10),
WaitUntilReadyOptions(
service_types=[ServiceType.KeyValue, ServiceType.Query]
),
)
return cluster

@retry(always_raise_list=(BucketNotFoundException,))
Expand All @@ -84,7 +91,12 @@ async def connect_async(self) -> AsyncCluster:
self.cb_connect_string, self.cluster_options
)
await cluster.on_connect()
await cluster.wait_until_ready(timedelta(seconds=10))
await cluster.wait_until_ready(
timedelta(seconds=10),
WaitUntilReadyOptions(
service_types=[ServiceType.KeyValue, ServiceType.Query]
),
)
return cluster

@retry(always_raise_list=(BucketNotFoundException,))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
from functools import wraps
from typing import Callable, Tuple, Type, Union

from couchbase.exceptions import CouchbaseException

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -38,7 +36,7 @@ def f_wrapper(*args, **kwargs):
for retry_number in range(retry_count + 1):
try:
return func(*args, **kwargs)
except CouchbaseException as err:
except Exception as err:
if always_raise_list and isinstance(err, always_raise_list):
raise

Check warning on line 41 in metadata-ingestion/src/datahub/ingestion/source/couchbase/retry.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/couchbase/retry.py#L39-L41

Added lines #L39 - L41 were not covered by tests

Expand Down Expand Up @@ -66,7 +64,7 @@ async def f_wrapper(*args, **kwargs):
for retry_number in range(retry_count + 1):
try:
return await func(*args, **kwargs)
except CouchbaseException as err:
except Exception as err:
if always_raise_list and isinstance(err, always_raise_list):
raise

Check warning on line 69 in metadata-ingestion/src/datahub/ingestion/source/couchbase/retry.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/couchbase/retry.py#L67-L69

Added lines #L67 - L69 were not covered by tests

Expand Down

0 comments on commit cdec644

Please sign in to comment.