Skip to content

Commit

Permalink
feat(ingest): Couchbase source extra error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
mminichino committed Jan 15, 2025
1 parent 32f019c commit 9af84b3
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

from acouchbase.collection import AsyncCollection
from acouchbase.scope import AsyncScope
from couchbase.exceptions import DocumentNotFoundException
from couchbase.result import GetResult

from datahub.ingestion.source.couchbase.couchbase_connect import CouchbaseConnect
from datahub.ingestion.source.couchbase.retry import retry

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -36,9 +38,16 @@ async def init(self):
self.keyspace
)

async def collection_get(self, key: str) -> GetResult:
return await self.collection.get(key)
@retry()
async def collection_get(self, key: str) -> dict:
try:
result: GetResult = await self.collection.get(key)
return result.content_as[dict]
except DocumentNotFoundException:
logger.warning(f"Document ID {key} not found")
return {}

@retry()
async def get_keys(self):
query = f"select meta().id from {self.connector.collection_name}"
if self.max_sample_size > 0:
Expand Down Expand Up @@ -75,8 +84,9 @@ async def get_documents(self) -> AsyncGenerator[List[dict], None]:
if isinstance(result, Exception):
logger.error(result)
errors += 1
elif isinstance(result, GetResult):
batch.append(result.content_as[dict])
elif isinstance(result, dict):
if result:
batch.append(result)
yield batch

if errors > 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def bucket_info(self, bucket_name: str) -> BucketSettings:
settings: BucketSettings = self.bucket_manager.get_bucket(bucket_name)
return settings

@retry()
def collection_infer(
self, sample_size: int, sample_values: int, keyspace: str
) -> dict:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import os
import shutil
import time
from pathlib import Path

import pytest
Expand Down Expand Up @@ -62,6 +63,8 @@ def test_couchbase_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_ti

assert response.status_code == 200

time.sleep(2)

# Run the metadata ingestion pipeline.
pipeline = Pipeline.create(
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import base64
import logging

Expand Down Expand Up @@ -56,6 +57,8 @@ async def test_couchbase_driver(

assert response.status_code == 200

await asyncio.sleep(2)

# Run the driver test.
couchbase_connect = CouchbaseConnect(
"couchbases://127.0.0.1", "Administrator", "password", 5, 60
Expand Down

0 comments on commit 9af84b3

Please sign in to comment.