Skip to content

Commit

Permalink
update testing and fixing formats
Browse files Browse the repository at this point in the history
  • Loading branch information
acrylJonny committed Jan 19, 2025
1 parent dda7dac commit 999822a
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 1,714 deletions.
121 changes: 81 additions & 40 deletions metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import math
import random
from dataclasses import dataclass, field
from datetime import datetime
from datetime import datetime, timedelta
from typing import Any, Dict, Iterable, List, Optional, Set, Type, cast

import avro.io
Expand Down Expand Up @@ -81,6 +81,8 @@
HistogramClass,
KafkaSchemaClass,
OwnershipSourceTypeClass,
PartitionSpecClass,
PartitionTypeClass,
QuantileClass,
SchemaMetadataClass,
StatusClass,
Expand Down Expand Up @@ -494,10 +496,10 @@ def create_profile_data(
multiple=self.profiler_config.max_sample_time_seconds,
),
# Add partition specification
# partitionSpec=PartitionSpecClass(
# partition=f"SAMPLE ({self.profiler_config.sample_size}/{self.profiler_config.max_sample_time_seconds} seconds)",
# type=PartitionTypeClass.PARTITION,
# ),
partitionSpec=PartitionSpecClass(
partition=f"SAMPLE ({str(self.profiler_config.sample_size)} samples / {str(self.profiler_config.max_sample_time_seconds)} seconds)",
type=PartitionTypeClass.QUERY,
),
fieldProfiles=field_profiles,
)

Expand All @@ -509,6 +511,8 @@ def get_kafka_consumer(
{
"group.id": "datahub-kafka-ingestion",
"bootstrap.servers": connection.bootstrap,
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
**connection.consumer_config,
}
)
Expand Down Expand Up @@ -789,7 +793,6 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
def _process_message_part(
self, data: Any, prefix: str, topic: str, is_key: bool = False
) -> Optional[Any]:
"""Process either key or value part of a message using schema registry for decoding."""
if data is None:
return None

Expand All @@ -811,21 +814,23 @@ def _process_message_part(

if schema_str:
try:
# Parse schema and create reader
schema = avro.schema.parse(schema_str)
# Decode Avro data - first 5 bytes are magic byte and schema ID
decoder = avro.io.BinaryDecoder(io.BytesIO(data[5:]))
reader = avro.io.DatumReader(schema)
decoded_value = reader.read(decoder)

if isinstance(decoded_value, (dict, list)):
# Flatten nested structures
if isinstance(decoded_value, list):
decoded_value = {"item": decoded_value}
return flatten_json(decoded_value)
return decoded_value
# Check if this is Avro data (has magic byte)
if len(data) > 5 and data[0] == 0: # Magic byte check
schema = avro.schema.parse(schema_str)
decoder = avro.io.BinaryDecoder(io.BytesIO(data[5:]))
reader = avro.io.DatumReader(schema)
decoded_value = reader.read(decoder)

if isinstance(decoded_value, (dict, list)):
# Flatten nested structures
if isinstance(decoded_value, list):
decoded_value = {"item": decoded_value}

Check warning on line 827 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L827

Added line #L827 was not covered by tests
return flatten_json(decoded_value)
return decoded_value
except Exception as e:
logger.warning(f"Failed to decode Avro message: {e}")
self.report.report_warning(

Check warning on line 831 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L829-L831

Added lines #L829 - L831 were not covered by tests
"Failed to decode Avro message for topic", topic, exc=e
)

# Fallback to JSON decode if no schema or Avro decode fails
try:
Expand All @@ -846,6 +851,7 @@ def _process_message_part(
return data

Check warning on line 851 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L851

Added line #L851 was not covered by tests

def get_sample_messages(self, topic: str) -> List[Dict[str, Any]]:
"""Get sample messages including historical data"""
samples: List[Dict[str, Any]] = []
try:
# Get metadata for all partitions
Expand All @@ -854,25 +860,45 @@ def get_sample_messages(self, topic: str) -> List[Dict[str, Any]]:
confluent_kafka.TopicPartition(topic, p)
for p in topic_metadata.partitions.keys()
]
self.consumer.assign(partitions)
logger.debug(f"Assigned to topic {topic}")

max_polls = 10 # Limit number of poll attempts
polls = 0
# For each partition, seek to an earlier offset
for partition in partitions:
# Get the end offset
low, high = self.consumer.get_watermark_offsets(partition)
# Ensure we get enough messages
messages_per_partition = max(
self.source_config.profiling.sample_size // len(partitions),
10, # Minimum 10 messages per partition
)
# Start from either beginning or calculated position
start_offset = max(low, high - messages_per_partition)
partition.offset = start_offset
logger.debug(
f"Setting partition {partition.partition} offset to {start_offset} "
f"(low={low}, high={high})"
)

self.consumer.assign(partitions)
logger.debug(f"Assigned to topic {topic} with specific offsets")

end_time = datetime.now() + timedelta(
seconds=float(self.source_config.profiling.max_sample_time_seconds)
)
while (
len(samples) < self.source_config.profiling.sample_size
and polls < max_polls
and datetime.now() < end_time
):
polls += 1
msg = self.consumer.poll(timeout=1.0)

if msg is None:
continue

Check warning on line 894 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L894

Added line #L894 was not covered by tests

if msg.error():
logger.warning(f"Error while consuming from {topic}: {msg.error()}")
break
self.report.report_warning(

Check warning on line 897 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L897

Added line #L897 was not covered by tests
"profiling",
f"Error while consuming from {topic}: {msg.error()}",
)
continue

Check warning on line 901 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L901

Added line #L901 was not covered by tests

try:
key = msg.key() if callable(msg.key) else msg.key
Expand Down Expand Up @@ -908,15 +934,21 @@ def get_sample_messages(self, topic: str) -> List[Dict[str, Any]]:
samples.append(sample)

except Exception as e:
logger.warning(f"Failed to process message: {e}")
self.report.report_warning(

Check warning on line 937 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L936-L937

Added lines #L936 - L937 were not covered by tests
"profiling", f"Failed to process message: {str(e)}"
)

except Exception as e:
logger.warning(f"Failed to collect samples from {topic}: {e}")
self.report.report_warning(

Check warning on line 942 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L941-L942

Added lines #L941 - L942 were not covered by tests
"profiling", f"Failed to collect samples from {topic}: {str(e)}"
)
finally:
try:
self.consumer.unassign()
except Exception as e:
logger.warning(f"Failed to unassign consumer: {e}")
self.report.report_warning(

Check warning on line 949 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L948-L949

Added lines #L948 - L949 were not covered by tests
"profiling", f"Failed to unassign consumer: {str(e)}"
)

return samples

Expand Down Expand Up @@ -1007,12 +1039,19 @@ def create_profiling_wu(
"""Create samples work unit incorporating both schema fields and sample values."""
# Only proceed if profiling is enabled
if not self.source_config.profiling.enabled:
self.report.report_warning(

Check warning on line 1042 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L1042

Added line #L1042 was not covered by tests
"Profiling not enabled for topic",
topic,
)
return

Check warning on line 1046 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L1046

Added line #L1046 was not covered by tests

samples = self.get_sample_messages(topic)
if not samples:
self.report.report_warning("No samples collected for topic", topic)
return

Check warning on line 1051 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L1050-L1051

Added lines #L1050 - L1051 were not covered by tests

self.report.info(f"Collected {len(samples)} samples for topic", topic)

# Respect sample size limit if configured
if self.source_config.profiling.limit:
samples = samples[: self.source_config.profiling.limit]

Check warning on line 1057 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L1057

Added line #L1057 was not covered by tests
Expand Down Expand Up @@ -1158,10 +1197,10 @@ def _extract_record(
custom_props = self.build_custom_properties(
topic, topic_detail, extra_topic_config
)
schema_name: Optional[
str
] = self.schema_registry_client._get_subject_for_topic(
topic, is_key_schema=False
schema_name: Optional[str] = (
self.schema_registry_client._get_subject_for_topic(
topic, is_key_schema=False
)
)
if schema_name is not None:
custom_props["Schema Name"] = schema_name
Expand Down Expand Up @@ -1311,11 +1350,13 @@ def fetch_extra_topic_details(self, topics: List[str]) -> Dict[str, dict]:

def fetch_topic_configurations(self, topics: List[str]) -> Dict[str, dict]:
logger.info("Fetching config details for all topics")
configs: Dict[
ConfigResource, concurrent.futures.Future
] = self.admin_client.describe_configs(
resources=[ConfigResource(ConfigResource.Type.TOPIC, t) for t in topics],
request_timeout=self.source_config.connection.client_timeout_seconds,
configs: Dict[ConfigResource, concurrent.futures.Future] = (
self.admin_client.describe_configs(
resources=[
ConfigResource(ConfigResource.Type.TOPIC, t) for t in topics
],
request_timeout=self.source_config.connection.client_timeout_seconds,
)
)
logger.debug("Waiting for config details futures to complete")
concurrent.futures.wait(configs.values())
Expand Down
Loading

0 comments on commit 999822a

Please sign in to comment.