Skip to content

Commit

Permalink
Update kafka.py
Browse files Browse the repository at this point in the history
  • Loading branch information
acrylJonny committed Jan 16, 2025
1 parent 120e022 commit 51f2cfc
Showing 1 changed file with 40 additions and 8 deletions.
48 changes: 40 additions & 8 deletions metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,19 +508,34 @@ def _create_profile_class(
fieldPath=field_path,
# Required fields with fallbacks
sampleValues=random.sample(samples, min(3, len(samples)))
if samples
if samples and self.source_config.profiling.include_field_sample_values
else None,
nullCount=row_count - len(samples)
if self.source_config.profiling.include_field_null_count
else None,
uniqueCount=len(set(samples))
if self.source_config.profiling.include_field_distinct_count
else None,
nullCount=row_count - len(samples),
uniqueCount=len(set(samples)),
# Numeric statistics
min=str(min(numeric_samples)) if numeric_samples else None,
max=str(max(numeric_samples)) if numeric_samples else None,
mean=str(statistics.mean(numeric_samples)) if numeric_samples else None,
min=str(min(numeric_samples))
if numeric_samples
and self.source_config.profiling.include_field_min_value
else None,
max=str(max(numeric_samples))
if numeric_samples
and self.source_config.profiling.include_field_max_value
else None,
mean=str(statistics.mean(numeric_samples))
if numeric_samples
and self.source_config.profiling.include_field_mean_value
else None,
median=str(statistics.median(numeric_samples))
if numeric_samples
and self.source_config.profiling.include_field_median_value
else None,
stdev=str(statistics.stdev(numeric_samples))
if len(numeric_samples) > 1
and self.source_config.profiling.include_field_stddev_value
else None,
# Quartile statistics
quantiles=[
Expand All @@ -532,6 +547,7 @@ def _create_profile_class(
),
]
if len(sorted_samples) >= 4
and self.source_config.profiling.include_field_quantiles
else None,
# Set unused fields to None
histogram=None,
Expand All @@ -553,7 +569,7 @@ def _create_profile_class(
),
# Add partition specification
partitionSpec=PartitionSpecClass(
partition="SAMPLE",
partition=f"SAMPLE ({self.source_config.profiling.sample_size}/{self.source_config.profiling.max_sample_time_seconds} seconds)",
type=PartitionTypeClass.QUERY,
timePartition=TimeWindowClass(
startTimeMillis=timestamp_millis,
Expand Down Expand Up @@ -600,9 +616,25 @@ def _process_message(
key = msg.key() if callable(msg.key) else msg.key
value = msg.value() if callable(msg.value) else msg.value

# Fix timestamp handling
msg_timestamp = msg.timestamp()[1]
try:
# If timestamp is in milliseconds (> 1e10), convert to seconds
if msg_timestamp > 1e10:
msg_timestamp = msg_timestamp / 1000.0
# Handle potential out of range timestamps
if (
msg_timestamp < 0 or msg_timestamp > 2147483647
): # Max unix timestamp
msg_timestamp = datetime.now().timestamp()
timestamp_str = datetime.fromtimestamp(msg_timestamp).isoformat()
except (ValueError, OSError, OverflowError):
# Fallback to current time if timestamp conversion fails
timestamp_str = datetime.now().isoformat()

sample = {
"offset": msg.offset(),
"timestamp": datetime.fromtimestamp(msg.timestamp()[1]).isoformat(),
"timestamp": timestamp_str,
}

# Process key if present
Expand Down

0 comments on commit 51f2cfc

Please sign in to comment.