From 7346fdf7abda39755a421931a6c759792c0d17e3 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Wed, 15 Jan 2025 12:55:26 +0000 Subject: [PATCH] Update kafka.py --- .../datahub/ingestion/source/kafka/kafka.py | 72 ++++++++++++++----- 1 file changed, 56 insertions(+), 16 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py index 4a20760cb5e20..718f45192710a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py @@ -1,5 +1,6 @@ import base64 import concurrent.futures +import io import json import logging import random @@ -7,6 +8,7 @@ from datetime import datetime, timedelta from typing import Any, Dict, Iterable, List, Optional, Set, Type, cast +import avro.io import avro.schema import confluent_kafka import confluent_kafka.admin @@ -412,29 +414,63 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: "subject", f"Exception while extracting topic {subject}: {e}" ) - def _process_message_part(self, data: Any, prefix: str) -> Optional[Any]: - """Process either key or value part of a message.""" + def _process_message_part( + self, data: Any, prefix: str, topic: str, is_key: bool = False + ) -> Optional[Any]: + """Process either key or value part of a message using schema registry for decoding.""" if data is None: return None if isinstance(data, bytes): try: - # Try JSON decode first - decoded = json.loads(data.decode("utf-8")) - if isinstance(decoded, (dict, list)): - # Flatten nested structures - if isinstance(decoded, list): - # Convert list to dict before flattening - decoded = {"item": decoded} - return flatten_json(decoded) - return decoded - except Exception: - # If JSON fails, try to decode as string, then fallback to base64 + # Get schema metadata + schema_metadata = self.schema_registry_client.get_schema_metadata( + topic, make_data_platform_urn(self.platform), False + ) + + if schema_metadata and isinstance( + schema_metadata.platformSchema, KafkaSchemaClass + ): + schema_str = ( + schema_metadata.platformSchema.keySchema + if is_key + else schema_metadata.platformSchema.documentSchema + ) + + if schema_str: + try: + # Parse schema and create reader + schema = avro.schema.parse(schema_str) + # Decode Avro data - first 5 bytes are magic byte and schema ID + decoder = avro.io.BinaryDecoder(io.BytesIO(data[5:])) + reader = avro.io.DatumReader(schema) + decoded_value = reader.read(decoder) + + if isinstance(decoded_value, (dict, list)): + # Flatten nested structures + if isinstance(decoded_value, list): + decoded_value = {"item": decoded_value} + return flatten_json(decoded_value) + return decoded_value + except Exception as e: + logger.warning(f"Failed to decode Avro message: {e}") + + # Fallback to JSON decode if no schema or Avro decode fails try: - return data.decode("utf-8") + decoded = json.loads(data.decode("utf-8")) + if isinstance(decoded, (dict, list)): + if isinstance(decoded, list): + decoded = {"item": decoded} + return flatten_json(decoded) + return decoded except Exception: + # If JSON fails, use base64 as last resort return base64.b64encode(data).decode("utf-8") + except Exception as e: + logger.warning(f"Failed to process message part: {e}") + return base64.b64encode(data).decode("utf-8") + return data def get_sample_messages(self, topic: str) -> List[Dict[str, Any]]: @@ -464,8 +500,12 @@ def get_sample_messages(self, topic: str) -> List[Dict[str, Any]]: # Process both key and value key = msg.key() value = msg.value() - processed_key = self._process_message_part(key, "key") - processed_value = self._process_message_part(value, "value") + processed_key = self._process_message_part( + key, "key", topic, is_key=True + ) + processed_value = self._process_message_part( + value, "value", topic, is_key=False + ) msg_timestamp = msg.timestamp()[1] timestamp_dt = datetime.fromtimestamp(