Skip to content

Commit

Permalink
reintroduce random to bring in more samples. Alter partition spec to …
Browse files Browse the repository at this point in the history
…be clearer
  • Loading branch information
acrylJonny committed Jan 19, 2025
1 parent 521d87c commit 96df646
Showing 1 changed file with 8 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import math
import random
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, List, Optional
Expand Down Expand Up @@ -180,19 +181,14 @@ def _calculate_numeric_stats(

return stats

def _get_sample_values(self, values: List[Any], max_samples: int = 3) -> List[str]:
"""Get representative sample values"""
def _get_sample_values(self, values: List[Any], max_samples: int = 20) -> List[str]:
if not values:
return []

Check warning on line 186 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka_profiler.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka_profiler.py#L186

Added line #L186 was not covered by tests

try:
# Try to get values from start, middle, and end
indices = [0]
if len(values) > 1:
indices.append(len(values) // 2)
if len(values) > 2:
indices.append(-1)

# Take a random sample up to max_samples size
sample_size = min(max_samples, len(values))
indices = sorted(random.sample(range(len(values)), sample_size))
samples = [str(values[i]) for i in indices]
samples = [s[:1000] for s in samples] # Limit string length
return samples
Expand Down Expand Up @@ -441,7 +437,7 @@ def _process_field_statistics(
for v in non_null_values
if not (isinstance(v, float) and math.isnan(v))
and not is_special_value(v)
]
],
)
if self.profiler_config.include_field_sample_values
else [],
Expand Down Expand Up @@ -625,8 +621,8 @@ def create_profile_data(
),
# Add partition specification
partitionSpec=PartitionSpecClass(
partition=f"SAMPLE ({str(self.profiler_config.sample_size)} samples / {str(self.profiler_config.max_sample_time_seconds)} seconds)",
type=PartitionTypeClass.PARTITION,
partition=f"SAMPLE ({str(sample_count)} samples)",
type=PartitionTypeClass.QUERY,
),
fieldProfiles=field_profiles,
)

0 comments on commit 96df646

Please sign in to comment.