Skip to content

Commit

Permalink
Merge branch 'master' into master+ing-410-bigquery-sampled-profiling-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Nov 1, 2023
2 parents df25dcc + 876de21 commit 0e6d5a3
Show file tree
Hide file tree
Showing 19 changed files with 4,343 additions and 4,156 deletions.
6 changes: 5 additions & 1 deletion docker/docker-compose-with-cassandra.yml
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ services:
retries: 5
timeout: 5s
volumes:
- zkdata:/var/lib/zookeeper
# See https://stackoverflow.com/a/61008432 for why we need two volumes.
# See also: https://docs.confluent.io/platform/current/installation/docker/operations/external-volumes.html#data-volumes-for-kafka-and-zk
- zkdata:/var/lib/zookeeper/data
- zklogs:/var/lib/zookeeper/log
networks:
default:
name: datahub_network
Expand All @@ -210,3 +213,4 @@ volumes:
neo4jdata:
broker:
zkdata:
zklogs:
6 changes: 5 additions & 1 deletion docker/docker-compose-without-neo4j.yml
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,15 @@ services:
retries: 3
timeout: 5s
volumes:
- zkdata:/var/lib/zookeeper
# See https://stackoverflow.com/a/61008432 for why we need two volumes.
# See also: https://docs.confluent.io/platform/current/installation/docker/operations/external-volumes.html#data-volumes-for-kafka-and-zk
- zkdata:/var/lib/zookeeper/data
- zklogs:/var/lib/zookeeper/log
networks:
default:
name: datahub_network
volumes:
esdata:
broker:
zkdata:
zklogs:
6 changes: 5 additions & 1 deletion docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,10 @@ services:
retries: 3
timeout: 5s
volumes:
- zkdata:/var/lib/zookeeper
# See https://stackoverflow.com/a/61008432 for why we need two volumes.
# See also: https://docs.confluent.io/platform/current/installation/docker/operations/external-volumes.html#data-volumes-for-kafka-and-zk
- zkdata:/var/lib/zookeeper/data
- zklogs:/var/lib/zookeeper/log
networks:
default:
name: datahub_network
Expand All @@ -204,3 +207,4 @@ volumes:
neo4jdata:
broker:
zkdata:
zklogs:
4 changes: 3 additions & 1 deletion docker/quickstart/docker-compose-m1.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,13 @@ services:
ports:
- ${DATAHUB_MAPPED_ZK_PORT:-2181}:2181
volumes:
- zkdata:/var/lib/zookeeper
- zkdata:/var/lib/zookeeper/data
- zklogs:/var/lib/zookeeper/log
version: '3.9'
volumes:
broker: null
esdata: null
mysqldata: null
neo4jdata: null
zkdata: null
zklogs: null
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,12 @@ services:
ports:
- ${DATAHUB_MAPPED_ZK_PORT:-2181}:2181
volumes:
- zkdata:/var/lib/zookeeper
- zkdata:/var/lib/zookeeper/data
- zklogs:/var/lib/zookeeper/log
version: '3.9'
volumes:
broker: null
esdata: null
mysqldata: null
zkdata: null
zklogs: null
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,12 @@ services:
ports:
- ${DATAHUB_MAPPED_ZK_PORT:-2181}:2181
volumes:
- zkdata:/var/lib/zookeeper
- zkdata:/var/lib/zookeeper/data
- zklogs:/var/lib/zookeeper/log
version: '3.9'
volumes:
broker: null
esdata: null
mysqldata: null
zkdata: null
zklogs: null
4 changes: 3 additions & 1 deletion docker/quickstart/docker-compose.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,13 @@ services:
ports:
- ${DATAHUB_MAPPED_ZK_PORT:-2181}:2181
volumes:
- zkdata:/var/lib/zookeeper
- zkdata:/var/lib/zookeeper/data
- zklogs:/var/lib/zookeeper/log
version: '3.9'
volumes:
broker: null
esdata: null
mysqldata: null
neo4jdata: null
zkdata: null
zklogs: null
7 changes: 7 additions & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,16 @@ org.gradle.caching=false
# Increase gradle JVM memory to 3GB to allow tests to run locally
org.gradle.jvmargs=-Xmx3000m
# Increase retries to 5 (from default of 3) and increase interval from 125ms to 1s.
# Based on this thread https://github.com/gradle/gradle/issues/4629, it's unclear
# if we should be using systemProp or not. We're using both for now.
org.gradle.internal.repository.max.retries=5
org.gradle.internal.repository.max.tentatives=5
org.gradle.internal.repository.initial.backoff=1000
systemProp.org.gradle.internal.http.connectionTimeout=120000
systemProp.org.gradle.internal.http.socketTimeout=120000
systemProp.org.gradle.internal.repository.max.retries=5
systemProp.org.gradle.internal.repository.max.tentatives=5
systemProp.org.gradle.internal.repository.initial.backoff=1000

# Needed to publish to Nexus from a sub-module
gnsp.disableApplyOnlyOnRootProjectEnforcement=true
2 changes: 1 addition & 1 deletion metadata-ingestion/docs/sources/teradata/teradata_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ will fit for your queries (the default query text size Teradata captures is max
REPLACE QUERY LOGGING LIMIT SQLTEXT=2000 ON ALL;
```
See more here about query logging:
[https://docs.teradata.com/r/Teradata-VantageCloud-Lake/Database-Reference/Database-Administration/Tracking-Query-Behavior-with-Database-Query-Logging-Operational-DBAs]()
[https://docs.teradata.com/r/Teradata-VantageCloud-Lake/Database-Reference/Database-Administration/Tracking-Query-Behavior-with-Database-Query-Logging-Operational-DBAs](https://docs.teradata.com/r/Teradata-VantageCloud-Lake/Database-Reference/Database-Administration/Tracking-Query-Behavior-with-Database-Query-Logging-Operational-DBAs)
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
sqlglot_lib = {
# Using an Acryl fork of sqlglot.
# https://github.com/tobymao/sqlglot/compare/main...hsheth2:sqlglot:hsheth?expand=1
"acryl-sqlglot==18.17.1.dev16",
"acryl-sqlglot==19.0.2.dev10",
}

sql_common = (
Expand Down
74 changes: 56 additions & 18 deletions metadata-ingestion/src/datahub/ingestion/source/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
EnvConfigMixin,
PlatformInstanceConfigMixin,
)
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.emitter.mce_builder import (
make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
Expand All @@ -25,14 +30,21 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.schema_inference.object import (
SchemaDescription,
construct_schema,
)
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StaleEntityRemovalSourceReport,
StatefulIngestionConfigBase,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
ArrayTypeClass,
BooleanTypeClass,
Expand All @@ -48,7 +60,10 @@
TimeTypeClass,
UnionTypeClass,
)
from datahub.metadata.schema_classes import DatasetPropertiesClass
from datahub.metadata.schema_classes import (
DataPlatformInstanceClass,
DatasetPropertiesClass,
)

logger = logging.getLogger(__name__)

Expand All @@ -59,7 +74,9 @@
DENY_DATABASE_LIST = set(["admin", "config", "local"])


class MongoDBConfig(PlatformInstanceConfigMixin, EnvConfigMixin):
class MongoDBConfig(
PlatformInstanceConfigMixin, EnvConfigMixin, StatefulIngestionConfigBase
):
# See the MongoDB authentication docs for details and examples.
# https://pymongo.readthedocs.io/en/stable/examples/authentication.html
connect_uri: str = Field(
Expand Down Expand Up @@ -99,6 +116,8 @@ class MongoDBConfig(PlatformInstanceConfigMixin, EnvConfigMixin):
default=AllowDenyPattern.allow_all(),
description="regex patterns for collections to filter in ingestion.",
)
# Custom Stateful Ingestion settings
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None

@validator("maxDocumentSize")
def check_max_doc_size_filter_is_valid(cls, doc_size_filter_value):
Expand All @@ -108,7 +127,7 @@ def check_max_doc_size_filter_is_valid(cls, doc_size_filter_value):


@dataclass
class MongoDBSourceReport(SourceReport):
class MongoDBSourceReport(StaleEntityRemovalSourceReport):
filtered: List[str] = field(default_factory=list)

def report_dropped(self, name: str) -> None:
Expand All @@ -129,6 +148,7 @@ def report_dropped(self, name: str) -> None:
bson.timestamp.Timestamp: "timestamp",
bson.dbref.DBRef: "dbref",
bson.objectid.ObjectId: "oid",
bson.Decimal128: "numberDecimal",
"mixed": "mixed",
}

Expand All @@ -145,6 +165,7 @@ def report_dropped(self, name: str) -> None:
bson.timestamp.Timestamp: TimeTypeClass,
bson.dbref.DBRef: BytesTypeClass,
bson.objectid.ObjectId: BytesTypeClass,
bson.Decimal128: NumberTypeClass,
dict: RecordTypeClass,
"mixed": UnionTypeClass,
}
Expand Down Expand Up @@ -206,7 +227,7 @@ def construct_schema_pymongo(
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(SourceCapability.SCHEMA_METADATA, "Enabled by default")
@dataclass
class MongoDBSource(Source):
class MongoDBSource(StatefulIngestionSourceBase):
"""
This plugin extracts the following:
Expand All @@ -227,7 +248,7 @@ class MongoDBSource(Source):
mongo_client: MongoClient

def __init__(self, ctx: PipelineContext, config: MongoDBConfig):
super().__init__(ctx)
super().__init__(config, ctx)
self.config = config
self.report = MongoDBSourceReport()

Expand All @@ -254,6 +275,14 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "MongoDBSource":
config = MongoDBConfig.parse_obj(config_dict)
return cls(ctx, config)

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
]

def get_pymongo_type_string(
self, field_type: Union[Type, str], collection_name: str
) -> str:
Expand Down Expand Up @@ -332,16 +361,18 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
platform_instance=self.config.platform_instance,
)

dataset_snapshot = DatasetSnapshot(
urn=dataset_urn,
aspects=[],
)
if self.config.platform_instance:
data_platform_instance = DataPlatformInstanceClass(
platform=make_data_platform_urn(platform),
instance=make_dataplatform_instance_urn(
platform, self.config.platform_instance
),
)

dataset_properties = DatasetPropertiesClass(
tags=[],
customProperties={},
)
dataset_snapshot.aspects.append(dataset_properties)

if self.config.enableSchemaInference:
assert self.config.maxDocumentSize is not None
Expand Down Expand Up @@ -412,13 +443,20 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
fields=canonical_schema,
)

dataset_snapshot.aspects.append(schema_metadata)

# TODO: use list_indexes() or index_information() to get index information
# See https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.list_indexes.

mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
yield MetadataWorkUnit(id=dataset_name, mce=mce)
yield from [
mcp.as_workunit()
for mcp in MetadataChangeProposalWrapper.construct_many(
entityUrn=dataset_urn,
aspects=[
schema_metadata,
dataset_properties,
data_platform_instance,
],
)
]

def is_server_version_gte_4_4(self) -> bool:
try:
Expand Down
Loading

0 comments on commit 0e6d5a3

Please sign in to comment.