Skip to content

Commit

Permalink
Update kafka.py
Browse files Browse the repository at this point in the history
  • Loading branch information
acrylJonny committed Jan 15, 2025
1 parent 2debbf1 commit 7346fdf
Showing 1 changed file with 56 additions and 16 deletions.
72 changes: 56 additions & 16 deletions metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import base64
import concurrent.futures
import io
import json
import logging
import random
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any, Dict, Iterable, List, Optional, Set, Type, cast

import avro.io
import avro.schema
import confluent_kafka
import confluent_kafka.admin
Expand Down Expand Up @@ -412,29 +414,63 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
"subject", f"Exception while extracting topic {subject}: {e}"
)

def _process_message_part(self, data: Any, prefix: str) -> Optional[Any]:
"""Process either key or value part of a message."""
def _process_message_part(
self, data: Any, prefix: str, topic: str, is_key: bool = False
) -> Optional[Any]:
"""Process either key or value part of a message using schema registry for decoding."""
if data is None:
return None

Check warning on line 422 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L421-L422

Added lines #L421 - L422 were not covered by tests

if isinstance(data, bytes):
try:

Check warning on line 425 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L424-L425

Added lines #L424 - L425 were not covered by tests
# Try JSON decode first
decoded = json.loads(data.decode("utf-8"))
if isinstance(decoded, (dict, list)):
# Flatten nested structures
if isinstance(decoded, list):
# Convert list to dict before flattening
decoded = {"item": decoded}
return flatten_json(decoded)
return decoded
except Exception:
# If JSON fails, try to decode as string, then fallback to base64
# Get schema metadata
schema_metadata = self.schema_registry_client.get_schema_metadata(

Check warning on line 427 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L427

Added line #L427 was not covered by tests
topic, make_data_platform_urn(self.platform), False
)

if schema_metadata and isinstance(

Check warning on line 431 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L431

Added line #L431 was not covered by tests
schema_metadata.platformSchema, KafkaSchemaClass
):
schema_str = (

Check warning on line 434 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L434

Added line #L434 was not covered by tests
schema_metadata.platformSchema.keySchema
if is_key
else schema_metadata.platformSchema.documentSchema
)

if schema_str:
try:

Check warning on line 441 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L440-L441

Added lines #L440 - L441 were not covered by tests
# Parse schema and create reader
schema = avro.schema.parse(schema_str)

Check warning on line 443 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L443

Added line #L443 was not covered by tests
# Decode Avro data - first 5 bytes are magic byte and schema ID
decoder = avro.io.BinaryDecoder(io.BytesIO(data[5:]))
reader = avro.io.DatumReader(schema)
decoded_value = reader.read(decoder)

Check warning on line 447 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L445-L447

Added lines #L445 - L447 were not covered by tests

if isinstance(decoded_value, (dict, list)):

Check warning on line 449 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L449

Added line #L449 was not covered by tests
# Flatten nested structures
if isinstance(decoded_value, list):
decoded_value = {"item": decoded_value}
return flatten_json(decoded_value)
return decoded_value
except Exception as e:
logger.warning(f"Failed to decode Avro message: {e}")

Check warning on line 456 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L451-L456

Added lines #L451 - L456 were not covered by tests

# Fallback to JSON decode if no schema or Avro decode fails
try:
return data.decode("utf-8")
decoded = json.loads(data.decode("utf-8"))
if isinstance(decoded, (dict, list)):
if isinstance(decoded, list):
decoded = {"item": decoded}
return flatten_json(decoded)
return decoded
except Exception:

Check warning on line 466 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L459-L466

Added lines #L459 - L466 were not covered by tests
# If JSON fails, use base64 as last resort
return base64.b64encode(data).decode("utf-8")

Check warning on line 468 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L468

Added line #L468 was not covered by tests

except Exception as e:
logger.warning(f"Failed to process message part: {e}")
return base64.b64encode(data).decode("utf-8")

Check warning on line 472 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L470-L472

Added lines #L470 - L472 were not covered by tests

return data

Check warning on line 474 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L474

Added line #L474 was not covered by tests

def get_sample_messages(self, topic: str) -> List[Dict[str, Any]]:
Expand Down Expand Up @@ -464,8 +500,12 @@ def get_sample_messages(self, topic: str) -> List[Dict[str, Any]]:
# Process both key and value
key = msg.key()
value = msg.value()
processed_key = self._process_message_part(key, "key")
processed_value = self._process_message_part(value, "value")
processed_key = self._process_message_part(

Check warning on line 503 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L501-L503

Added lines #L501 - L503 were not covered by tests
key, "key", topic, is_key=True
)
processed_value = self._process_message_part(

Check warning on line 506 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L506

Added line #L506 was not covered by tests
value, "value", topic, is_key=False
)

msg_timestamp = msg.timestamp()[1]
timestamp_dt = datetime.fromtimestamp(

Check warning on line 511 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L510-L511

Added lines #L510 - L511 were not covered by tests
Expand Down

0 comments on commit 7346fdf

Please sign in to comment.