diff --git a/datahub-web-react/src/app/ingest/source/builder/constants.ts b/datahub-web-react/src/app/ingest/source/builder/constants.ts index f892f0ed525d2..58525b3e88f97 100644 --- a/datahub-web-react/src/app/ingest/source/builder/constants.ts +++ b/datahub-web-react/src/app/ingest/source/builder/constants.ts @@ -38,6 +38,7 @@ import sigmaLogo from '../../../../images/sigmalogo.png'; import sacLogo from '../../../../images/saclogo.svg'; import cassandraLogo from '../../../../images/cassandralogo.png'; import datahubLogo from '../../../../images/datahublogo.png'; +import neo4j from '../../../../images/neo4j.png'; export const ATHENA = 'athena'; export const ATHENA_URN = `urn:li:dataPlatform:${ATHENA}`; @@ -137,6 +138,8 @@ export const DATAHUB_GC = 'datahub-gc'; export const DATAHUB_LINEAGE_FILE = 'datahub-lineage-file'; export const DATAHUB_BUSINESS_GLOSSARY = 'datahub-business-glossary'; export const DATAHUB_URN = `urn:li:dataPlatform:${DATAHUB}`; +export const NEO4J = 'neo4j'; +export const NEO4J_URN = `urn:li:dataPlatform:${NEO4J}`; export const PLATFORM_URN_TO_LOGO = { [ATHENA_URN]: athenaLogo, @@ -180,6 +183,7 @@ export const PLATFORM_URN_TO_LOGO = { [SAC_URN]: sacLogo, [CASSANDRA_URN]: cassandraLogo, [DATAHUB_URN]: datahubLogo, + [NEO4J_URN]: neo4j, }; export const SOURCE_TO_PLATFORM_URN = { diff --git a/datahub-web-react/src/app/ingest/source/builder/sources.json b/datahub-web-react/src/app/ingest/source/builder/sources.json index 70d9baabdb4bc..776b6703895c3 100644 --- a/datahub-web-react/src/app/ingest/source/builder/sources.json +++ b/datahub-web-react/src/app/ingest/source/builder/sources.json @@ -309,7 +309,7 @@ "displayName": "Dremio", "description": "Import Spaces, Sources, Tables and statistics from Dremio.", "docsUrl": "https://datahubproject.io/docs/metadata-ingestion/", - "recipe": "source:\n type: dremio\n config:\n # Coordinates\n hostname: null\n port: null\n #true if https, otherwise false\n tls: true\n\n #For cloud instance\n #is_dremio_cloud: True\n #dremio_cloud_project_id: \n\n #Credentials with personal access token\n authentication_method: PAT\n password: pass\n\n #Or Credentials with basic auth\n #authentication_method: password\n #username: null\n #password: null\n\n stateful_ingestion:\n enabled: true" + "recipe": "source:\n type: dremio\n config:\n # Coordinates\n hostname: null\n port: null\n #true if https, otherwise false\n tls: true\n\n #For cloud instance\n #is_dremio_cloud: True\n #dremio_cloud_project_id: \n\n #Credentials with personal access token\n authentication_method: PAT\n password: pass\n\n #Or Credentials with basic auth\n #authentication_method: password\n #username: null\n #password: null\n\n ingest_owner: true\n\n stateful_ingestion:\n enabled: true" }, { "urn": "urn:li:dataPlatform:cassandra", @@ -325,5 +325,13 @@ "description": "Ingest databases and tables from any Iceberg catalog implementation", "docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/iceberg", "recipe": "source:\n type: \"iceberg\"\n config:\n env: dev\n # each thread will open internet connections to fetch manifest files independently, \n # this value needs to be adjusted with ulimit\n processing_threads: 1 \n # a single catalog definition with a form of a dictionary\n catalog: \n demo: # name of the catalog\n type: \"rest\" # other types are available\n uri: \"uri\"\n s3.access-key-id: \"access-key\"\n s3.secret-access-key: \"secret-access-key\"\n s3.region: \"aws-region\"\n profiling:\n enabled: false\n" + }, + { + "urn": "urn:li:dataPlatform:neo4j", + "name": "neo4j", + "displayName": "Neo4j", + "description": "Import Nodes and Relationships from Neo4j.", + "docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/neo4j/", + "recipe": "source:\n type: 'neo4j'\n config:\n uri: 'neo4j+ssc://host:7687'\n username: 'neo4j'\n password: 'password'\n env: 'PROD'\n\nsink:\n type: \"datahub-rest\"\n config:\n server: 'http://localhost:8080'" } ] diff --git a/datahub-web-react/src/images/neo4j.png b/datahub-web-react/src/images/neo4j.png new file mode 100644 index 0000000000000..b03b2a4532b3b Binary files /dev/null and b/datahub-web-react/src/images/neo4j.png differ diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/CustomPropertiesPatchBuilder.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/CustomPropertiesPatchBuilder.java index e4143851afbe5..b78d563147e63 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/CustomPropertiesPatchBuilder.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/CustomPropertiesPatchBuilder.java @@ -17,10 +17,16 @@ public class CustomPropertiesPatchBuilder> operations = new ArrayList<>(); + private final List> operations; public CustomPropertiesPatchBuilder(T parentBuilder) { this.parent = parentBuilder; + if (parentBuilder != null) { + // If a parent builder is provided, we use the same path operations list. + this.operations = parentBuilder.getPathValues(); + } else { + this.operations = new ArrayList<>(); + } } /** @@ -72,9 +78,4 @@ public CustomPropertiesPatchBuilder setProperties(Map propert public T getParent() { return parent; } - - @Override - public List> getSubPaths() { - return operations; - } } diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/DataFlowInfoPatchBuilder.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/DataFlowInfoPatchBuilder.java index 6a114d90875fe..231956a2fcec8 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/DataFlowInfoPatchBuilder.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/DataFlowInfoPatchBuilder.java @@ -4,12 +4,10 @@ import static com.linkedin.metadata.Constants.DATA_FLOW_ENTITY_NAME; import static com.linkedin.metadata.Constants.DATA_FLOW_INFO_ASPECT_NAME; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.linkedin.common.TimeStamp; import com.linkedin.metadata.aspect.patch.PatchOperationType; import com.linkedin.metadata.aspect.patch.builder.subtypesupport.CustomPropertiesPatchBuilderSupport; -import java.util.List; import java.util.Map; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -87,28 +85,23 @@ public DataFlowInfoPatchBuilder setCreated(@Nullable TimeStamp created) { } public DataFlowInfoPatchBuilder setLastModified(@Nullable TimeStamp lastModified) { + ObjectNode lastModifiedNode = instance.objectNode(); if (lastModified == null) { pathValues.add( ImmutableTriple.of( PatchOperationType.REMOVE.getValue(), BASE_PATH + LAST_MODIFIED_KEY, null)); + } else { + lastModifiedNode.put(TIME_KEY, lastModified.getTime()); + if (lastModified.getActor() != null) { + lastModifiedNode.put(ACTOR_KEY, lastModified.getActor().toString()); + } + pathValues.add( + ImmutableTriple.of( + PatchOperationType.ADD.getValue(), BASE_PATH + LAST_MODIFIED_KEY, lastModifiedNode)); } - ObjectNode lastModifiedNode = instance.objectNode(); - lastModifiedNode.put(TIME_KEY, lastModified.getTime()); - if (lastModified.getActor() != null) { - lastModifiedNode.put(ACTOR_KEY, lastModified.getActor().toString()); - } - pathValues.add( - ImmutableTriple.of( - PatchOperationType.ADD.getValue(), BASE_PATH + LAST_MODIFIED_KEY, lastModifiedNode)); return this; } - @Override - protected List> getPathValues() { - pathValues.addAll(customPropertiesPatchBuilder.getSubPaths()); - return pathValues; - } - @Override protected String getAspectName() { return DATA_FLOW_INFO_ASPECT_NAME; diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/DataJobInfoPatchBuilder.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/DataJobInfoPatchBuilder.java index 99c0ac6c15eb1..dd17fbacf338e 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/DataJobInfoPatchBuilder.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/DataJobInfoPatchBuilder.java @@ -4,13 +4,11 @@ import static com.linkedin.metadata.Constants.DATA_JOB_ENTITY_NAME; import static com.linkedin.metadata.Constants.DATA_JOB_INFO_ASPECT_NAME; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.linkedin.common.TimeStamp; import com.linkedin.common.urn.DataFlowUrn; import com.linkedin.metadata.aspect.patch.PatchOperationType; import com.linkedin.metadata.aspect.patch.builder.subtypesupport.CustomPropertiesPatchBuilderSupport; -import java.util.List; import java.util.Map; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -113,12 +111,6 @@ public DataJobInfoPatchBuilder setLastModified(@Nullable TimeStamp lastModified) return this; } - @Override - protected List> getPathValues() { - pathValues.addAll(customPropertiesPatchBuilder.getSubPaths()); - return pathValues; - } - @Override protected String getAspectName() { return DATA_JOB_INFO_ASPECT_NAME; diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/DatasetPropertiesPatchBuilder.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/DatasetPropertiesPatchBuilder.java index 31e181fc244fb..60d52c7c72088 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/DatasetPropertiesPatchBuilder.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/DatasetPropertiesPatchBuilder.java @@ -4,10 +4,8 @@ import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME; import static com.linkedin.metadata.Constants.DATASET_PROPERTIES_ASPECT_NAME; -import com.fasterxml.jackson.databind.JsonNode; import com.linkedin.metadata.aspect.patch.PatchOperationType; import com.linkedin.metadata.aspect.patch.builder.subtypesupport.CustomPropertiesPatchBuilderSupport; -import java.util.List; import java.util.Map; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -116,12 +114,6 @@ public DatasetPropertiesPatchBuilder setCustomProperties(Map pro return this; } - @Override - protected List> getPathValues() { - pathValues.addAll(customPropertiesPatchBuilder.getSubPaths()); - return pathValues; - } - @Override protected String getAspectName() { return DATASET_PROPERTIES_ASPECT_NAME; diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/subtypesupport/IntermediatePatchBuilder.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/subtypesupport/IntermediatePatchBuilder.java index d891a6b9673da..cd74818c24e19 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/subtypesupport/IntermediatePatchBuilder.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/subtypesupport/IntermediatePatchBuilder.java @@ -1,9 +1,6 @@ package com.linkedin.metadata.aspect.patch.builder.subtypesupport; -import com.fasterxml.jackson.databind.JsonNode; import com.linkedin.metadata.aspect.patch.builder.AbstractMultiFieldPatchBuilder; -import java.util.List; -import org.apache.commons.lang3.tuple.ImmutableTriple; /** * Used for supporting intermediate subtypes when constructing a patch for an aspect that includes @@ -15,10 +12,4 @@ public interface IntermediatePatchBuilder> getSubPaths(); } diff --git a/entity-registry/src/test/java/com/linkedin/metadata/aspect/patch/builder/DataFlowInfoPatchBuilderTest.java b/entity-registry/src/test/java/com/linkedin/metadata/aspect/patch/builder/DataFlowInfoPatchBuilderTest.java new file mode 100644 index 0000000000000..612282b7c0238 --- /dev/null +++ b/entity-registry/src/test/java/com/linkedin/metadata/aspect/patch/builder/DataFlowInfoPatchBuilderTest.java @@ -0,0 +1,280 @@ +package com.linkedin.metadata.aspect.patch.builder; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import com.linkedin.common.TimeStamp; +import com.linkedin.common.urn.Urn; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang3.tuple.ImmutableTriple; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class DataFlowInfoPatchBuilderTest { + + private TestableDataFlowInfoPatchBuilder builder; + private static final String TEST_URN = "urn:li:dataFlow:(test,flow1,PROD)"; + + // Test helper class to expose protected method + private static class TestableDataFlowInfoPatchBuilder extends DataFlowInfoPatchBuilder { + public List> getTestPathValues() { + return getPathValues(); + } + } + + @BeforeMethod + public void setup() throws URISyntaxException { + builder = new TestableDataFlowInfoPatchBuilder(); + builder.urn(Urn.createFromString(TEST_URN)); + } + + @Test + public void testBuildDoesNotAffectPathValues() throws URISyntaxException { + String testName = "testFlow"; + String testDescription = "Test description"; + + builder.setName(testName).setDescription(testDescription).addCustomProperty("key1", "value1"); + + // First call build() + builder.build(); + + // Then verify we can still access pathValues and they're correct + List> pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 3); + + // Verify the operations are still intact + assertEquals(pathValues.get(0).getLeft(), "add"); + assertEquals(pathValues.get(0).getMiddle(), "/name"); + assertEquals(pathValues.get(0).getRight().asText(), testName); + + assertEquals(pathValues.get(1).getLeft(), "add"); + assertEquals(pathValues.get(1).getMiddle(), "/description"); + assertEquals(pathValues.get(1).getRight().asText(), testDescription); + + assertEquals(pathValues.get(2).getLeft(), "add"); + assertTrue(pathValues.get(2).getMiddle().startsWith("/customProperties/")); + assertEquals(pathValues.get(2).getRight().asText(), "value1"); + + // Verify we can call build() again without issues + builder.build(); + + // And verify pathValues are still accessible and correct + pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 3); + } + + @Test + public void testSetName() { + String testName = "testFlow"; + builder.setName(testName); + builder.build(); + + List> pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 1); + + ImmutableTriple operation = pathValues.get(0); + assertEquals(operation.getLeft(), "add"); + assertEquals(operation.getMiddle(), "/name"); + assertEquals(operation.getRight().asText(), testName); + } + + @Test + public void testSetDescription() { + String testDescription = "Test description"; + builder.setDescription(testDescription); + builder.build(); + + List> pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 1); + + ImmutableTriple operation = pathValues.get(0); + assertEquals(operation.getLeft(), "add"); + assertEquals(operation.getMiddle(), "/description"); + assertEquals(operation.getRight().asText(), testDescription); + } + + @Test + public void testSetDescriptionNull() { + builder.setDescription(null); + builder.build(); + + List> pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 1); + + ImmutableTriple operation = pathValues.get(0); + assertEquals(operation.getLeft(), "remove"); + assertEquals(operation.getMiddle(), "/description"); + assertNull(operation.getRight()); + } + + @Test + public void testSetProject() { + String testProject = "testProject"; + builder.setProject(testProject); + builder.build(); + + List> pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 1); + + ImmutableTriple operation = pathValues.get(0); + assertEquals(operation.getLeft(), "add"); + assertEquals(operation.getMiddle(), "/project"); + assertEquals(operation.getRight().asText(), testProject); + } + + @Test + public void testSetProjectNull() { + builder.setProject(null); + builder.build(); + + List> pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 1); + + ImmutableTriple operation = pathValues.get(0); + assertEquals(operation.getLeft(), "remove"); + assertEquals(operation.getMiddle(), "/project"); + assertNull(operation.getRight()); + } + + @Test + public void testSetCreated() throws URISyntaxException { + long time = System.currentTimeMillis(); + String actor = "urn:li:corpuser:testUser"; + TimeStamp created = new TimeStamp(); + created.setTime(time); + created.setActor(Urn.createFromString(actor)); + + builder.setCreated(created); + builder.build(); + + List> pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 1); + + ImmutableTriple operation = pathValues.get(0); + assertEquals(operation.getLeft(), "add"); + assertEquals(operation.getMiddle(), "/created"); + JsonNode createdNode = operation.getRight(); + assertTrue(createdNode.isObject()); + assertEquals(createdNode.get("time").asLong(), time); + assertEquals(createdNode.get("actor").asText(), actor); + } + + @Test + public void testSetCreatedNull() { + builder.setCreated(null); + builder.build(); + + List> pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 1); + + ImmutableTriple operation = pathValues.get(0); + assertEquals(operation.getLeft(), "remove"); + assertEquals(operation.getMiddle(), "/created"); + assertNull(operation.getRight()); + } + + @Test + public void testSetLastModified() throws URISyntaxException { + long time = System.currentTimeMillis(); + String actor = "urn:li:corpuser:testUser"; + TimeStamp lastModified = new TimeStamp(); + lastModified.setTime(time); + lastModified.setActor(Urn.createFromString(actor)); + + builder.setLastModified(lastModified); + builder.build(); + + List> pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 1); + + ImmutableTriple operation = pathValues.get(0); + assertEquals(operation.getLeft(), "add"); + assertEquals(operation.getMiddle(), "/lastModified"); + JsonNode lastModifiedNode = operation.getRight(); + assertTrue(lastModifiedNode.isObject()); + assertEquals(lastModifiedNode.get("time").asLong(), time); + assertEquals(lastModifiedNode.get("actor").asText(), actor); + } + + @Test + public void testSetLastModifiedNull() { + builder.setLastModified(null); + builder.build(); + + List> pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 1); + + ImmutableTriple operation = pathValues.get(0); + assertEquals(operation.getLeft(), "remove"); + assertEquals(operation.getMiddle(), "/lastModified"); + assertNull(operation.getRight()); + } + + @Test + public void testAddCustomProperties() { + builder.addCustomProperty("key1", "value1").addCustomProperty("key2", "value2"); + builder.build(); + + List> pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 2); + + pathValues.forEach( + operation -> { + assertEquals(operation.getLeft(), "add"); + assertTrue(operation.getMiddle().startsWith("/customProperties/")); + assertTrue(operation.getRight().isTextual()); + }); + } + + @Test + public void testRemoveCustomProperty() { + builder.removeCustomProperty("key1"); + builder.build(); + + List> pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 1); + + ImmutableTriple operation = pathValues.get(0); + assertEquals(operation.getLeft(), "remove"); + assertEquals(operation.getMiddle(), "/customProperties/key1"); + assertNull(operation.getRight()); + } + + @Test + public void testSetCustomProperties() { + Map properties = new HashMap<>(); + properties.put("key1", "value1"); + properties.put("key2", "value2"); + + builder.setCustomProperties(properties); + builder.build(); + + List> pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 1); + + ImmutableTriple operation = pathValues.get(0); + assertEquals(operation.getLeft(), "add"); + assertEquals(operation.getMiddle(), "/customProperties"); + assertTrue(operation.getRight().isObject()); + } +} diff --git a/entity-registry/src/test/java/com/linkedin/metadata/aspect/patch/builder/DataJobInputOutputPatchBuilderTest.java b/entity-registry/src/test/java/com/linkedin/metadata/aspect/patch/builder/DataJobInputOutputPatchBuilderTest.java new file mode 100644 index 0000000000000..dc141863e2443 --- /dev/null +++ b/entity-registry/src/test/java/com/linkedin/metadata/aspect/patch/builder/DataJobInputOutputPatchBuilderTest.java @@ -0,0 +1,237 @@ +package com.linkedin.metadata.aspect.patch.builder; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import com.linkedin.common.Edge; +import com.linkedin.common.urn.DataJobUrn; +import com.linkedin.common.urn.DatasetUrn; +import com.linkedin.common.urn.Urn; +import com.linkedin.metadata.graph.LineageDirection; +import java.net.URISyntaxException; +import java.util.List; +import org.apache.commons.lang3.tuple.ImmutableTriple; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class DataJobInputOutputPatchBuilderTest { + + private TestableDataJobInputOutputPatchBuilder builder; + private static final String TEST_DATAJOB_URN = + "urn:li:dataJob:(urn:li:dataFlow:(test,flow1,PROD),job1)"; + private static final String TEST_DATASET_URN = + "urn:li:dataset:(urn:li:dataPlatform:hive,SampleTable,PROD)"; + private static final String TEST_DATASET_FIELD_URN = + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleTable,PROD),id)"; + + // Test helper class to expose protected method + private static class TestableDataJobInputOutputPatchBuilder + extends DataJobInputOutputPatchBuilder { + public List> getTestPathValues() { + return getPathValues(); + } + } + + @BeforeMethod + public void setup() throws URISyntaxException { + builder = new TestableDataJobInputOutputPatchBuilder(); + builder.urn(Urn.createFromString(TEST_DATAJOB_URN)); + } + + @Test + public void testBuildDoesNotAffectPathValues() throws URISyntaxException { + DataJobUrn dataJobUrn = DataJobUrn.createFromString(TEST_DATAJOB_URN); + DatasetUrn datasetUrn = DatasetUrn.createFromString(TEST_DATASET_URN); + + builder + .addInputDatajobEdge(dataJobUrn) + .addInputDatasetEdge(datasetUrn) + .addOutputDatasetEdge(datasetUrn); + + // First call build() + builder.build(); + + // Then verify we can still access pathValues and they're correct + List> pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 3); + + // Verify we can call build() again without issues + builder.build(); + + // And verify pathValues are still accessible and correct + pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 3); + } + + @Test + public void testAddInputDatajobEdge() throws URISyntaxException { + DataJobUrn dataJobUrn = DataJobUrn.createFromString(TEST_DATAJOB_URN); + builder.addInputDatajobEdge(dataJobUrn); + builder.build(); + + List> pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 1); + + ImmutableTriple operation = pathValues.get(0); + assertEquals(operation.getLeft(), "add"); + assertTrue(operation.getMiddle().startsWith("/inputDatajobEdges/")); + assertTrue(operation.getRight().isObject()); + assertEquals(operation.getRight().get("destinationUrn").asText(), dataJobUrn.toString()); + } + + @Test + public void testRemoveInputDatajobEdge() throws URISyntaxException { + DataJobUrn dataJobUrn = DataJobUrn.createFromString(TEST_DATAJOB_URN); + builder.removeInputDatajobEdge(dataJobUrn); + builder.build(); + + List> pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 1); + + ImmutableTriple operation = pathValues.get(0); + assertEquals(operation.getLeft(), "remove"); + assertTrue(operation.getMiddle().startsWith("/inputDatajobEdges/")); + assertNull(operation.getRight()); + } + + @Test + public void testAddInputDatasetEdge() throws URISyntaxException { + DatasetUrn datasetUrn = DatasetUrn.createFromString(TEST_DATASET_URN); + builder.addInputDatasetEdge(datasetUrn); + builder.build(); + + List> pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 1); + + ImmutableTriple operation = pathValues.get(0); + assertEquals(operation.getLeft(), "add"); + assertTrue(operation.getMiddle().startsWith("/inputDatasetEdges/")); + assertTrue(operation.getRight().isObject()); + assertEquals(operation.getRight().get("destinationUrn").asText(), datasetUrn.toString()); + } + + @Test + public void testRemoveInputDatasetEdge() throws URISyntaxException { + DatasetUrn datasetUrn = DatasetUrn.createFromString(TEST_DATASET_URN); + builder.removeInputDatasetEdge(datasetUrn); + builder.build(); + + List> pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 1); + + ImmutableTriple operation = pathValues.get(0); + assertEquals(operation.getLeft(), "remove"); + assertTrue(operation.getMiddle().startsWith("/inputDatasetEdges/")); + assertNull(operation.getRight()); + } + + @Test + public void testAddOutputDatasetEdge() throws URISyntaxException { + DatasetUrn datasetUrn = DatasetUrn.createFromString(TEST_DATASET_URN); + builder.addOutputDatasetEdge(datasetUrn); + builder.build(); + + List> pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 1); + + ImmutableTriple operation = pathValues.get(0); + assertEquals(operation.getLeft(), "add"); + assertTrue(operation.getMiddle().startsWith("/outputDatasetEdges/")); + assertTrue(operation.getRight().isObject()); + assertEquals(operation.getRight().get("destinationUrn").asText(), datasetUrn.toString()); + } + + @Test + public void testAddInputDatasetField() throws URISyntaxException { + Urn fieldUrn = Urn.createFromString(TEST_DATASET_FIELD_URN); + builder.addInputDatasetField(fieldUrn); + builder.build(); + + List> pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 1); + + ImmutableTriple operation = pathValues.get(0); + assertEquals(operation.getLeft(), "add"); + assertTrue(operation.getMiddle().startsWith("/inputDatasetFields/")); + assertTrue(operation.getRight().isTextual()); + assertEquals(operation.getRight().asText(), fieldUrn.toString()); + } + + @Test + public void testRemoveInputDatasetField() throws URISyntaxException { + Urn fieldUrn = Urn.createFromString(TEST_DATASET_FIELD_URN); + builder.removeInputDatasetField(fieldUrn); + builder.build(); + + List> pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 1); + + ImmutableTriple operation = pathValues.get(0); + assertEquals(operation.getLeft(), "remove"); + assertTrue(operation.getMiddle().startsWith("/inputDatasetFields/")); + assertNull(operation.getRight()); + } + + @Test + public void testAddOutputDatasetField() throws URISyntaxException { + Urn fieldUrn = Urn.createFromString(TEST_DATASET_FIELD_URN); + builder.addOutputDatasetField(fieldUrn); + builder.build(); + + List> pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 1); + + ImmutableTriple operation = pathValues.get(0); + assertEquals(operation.getLeft(), "add"); + assertTrue(operation.getMiddle().startsWith("/outputDatasetFields/")); + assertTrue(operation.getRight().isTextual()); + assertEquals(operation.getRight().asText(), fieldUrn.toString()); + } + + @Test + public void testAddEdgeWithDirection() throws URISyntaxException { + DatasetUrn datasetUrn = DatasetUrn.createFromString(TEST_DATASET_URN); + Edge edge = new Edge(); + edge.setDestinationUrn(datasetUrn); + + builder.addEdge(edge, LineageDirection.UPSTREAM); + builder.build(); + + List> pathValues = builder.getTestPathValues(); + assertNotNull(pathValues); + assertEquals(pathValues.size(), 1); + + ImmutableTriple operation = pathValues.get(0); + assertEquals(operation.getLeft(), "add"); + assertTrue(operation.getMiddle().startsWith("/inputDatasetEdges/")); + assertTrue(operation.getRight().isObject()); + assertEquals(operation.getRight().get("destinationUrn").asText(), datasetUrn.toString()); + } + + @Test + public void testInvalidEntityTypeThrowsException() throws URISyntaxException { + Urn invalidUrn = Urn.createFromString("urn:li:glossaryTerm:invalid"); + Edge edge = new Edge(); + edge.setDestinationUrn(invalidUrn); + + assertThrows( + IllegalArgumentException.class, + () -> { + builder.addEdge(edge, LineageDirection.UPSTREAM); + }); + } +} diff --git a/metadata-ingestion/build.gradle b/metadata-ingestion/build.gradle index 4e3f1ca91766c..4e03dd6e2faaf 100644 --- a/metadata-ingestion/build.gradle +++ b/metadata-ingestion/build.gradle @@ -30,15 +30,7 @@ task environmentSetup(type: Exec, dependsOn: checkPythonVersion) { "touch ${sentinel_file}" } -task runPreFlightScript(type: Exec, dependsOn: environmentSetup) { - def sentinel_file = ".preflight_sentinel" - outputs.file(sentinel_file) - commandLine 'bash', '-c', - "scripts/datahub_preflight.sh && " + - "touch ${sentinel_file}" -} - -task installPackageOnly(type: Exec, dependsOn: runPreFlightScript) { +task installPackageOnly(type: Exec, dependsOn: environmentSetup) { def sentinel_file = "${venv_name}/.build_install_package_only_sentinel" inputs.file file('setup.py') outputs.file(sentinel_file) diff --git a/metadata-ingestion/docs/sources/business-glossary/datahub-business-glossary.md b/metadata-ingestion/docs/sources/business-glossary/datahub-business-glossary.md index 3d2a0509492bd..3433a853ea9b0 100644 --- a/metadata-ingestion/docs/sources/business-glossary/datahub-business-glossary.md +++ b/metadata-ingestion/docs/sources/business-glossary/datahub-business-glossary.md @@ -7,7 +7,7 @@ The business glossary source file should be a .yml file with the following top-l Example **Glossary**: ```yaml -version: 1 # the version of business glossary file config the config conforms to. Currently the only version released is `1`. +version: "1" # the version of business glossary file config the config conforms to. Currently the only version released is `1`. source: DataHub # the source format of the terms. Currently only supports `DataHub` owners: # owners contains two nested fields users: # (optional) a list of user IDs @@ -60,7 +60,7 @@ Example **GlossaryTerm**: - Shipping.CountryCode - Shipping.StreetAddress custom_properties: # (optional) a map of key/value pairs of arbitrary custom properties - - is_used_for_compliance_tracking: true + - is_used_for_compliance_tracking: "true" knowledge_links: # (optional) a list of **KnowledgeCard** related to this term. These appear as links on the glossary node's page - url: "https://en.wikipedia.org/wiki/Address" label: Wiki link @@ -73,7 +73,7 @@ To see how these all work together, check out this comprehensive example busines Example business glossary file ```yaml -version: 1 +version: "1" source: DataHub owners: users: @@ -89,15 +89,15 @@ nodes: - name: Sensitive description: Sensitive Data custom_properties: - is_confidential: false + is_confidential: "false" - name: Confidential description: Confidential Data custom_properties: - is_confidential: true + is_confidential: "true" - name: HighlyConfidential description: Highly Confidential Data custom_properties: - is_confidential: true + is_confidential: "true" domain: Marketing - name: PersonalInformation description: All terms related to personal information @@ -148,7 +148,7 @@ nodes: related_terms: - Housing.Kitchen.Cutlery custom_properties: - - is_used_for_compliance_tracking: true + - is_used_for_compliance_tracking: "true" knowledge_links: - url: "https://en.wikipedia.org/wiki/Address" label: Wiki link @@ -237,7 +237,7 @@ Source file linked [here](https://github.com/datahub-project/datahub/blob/master ## Generating custom IDs for your terms -IDs are normally inferred from the glossary term/node's name, see the `enable_auto_id` config. But, if you need a stable +IDs are normally inferred from the glossary term/node's name, see the `enable_auto_id` config. But, if you need a stable identifier, you can generate a custom ID for your term. It should be unique across the entire Glossary. Here's an example ID: @@ -247,5 +247,5 @@ A note of caution: once you select a custom ID, it cannot be easily changed. ## Compatibility -Compatible with version 1 of business glossary format. -The source will be evolved as we publish newer versions of this format. \ No newline at end of file +Compatible with version 1 of business glossary format. +The source will be evolved as we publish newer versions of this format. diff --git a/metadata-ingestion/docs/sources/dremio/dremio_recipe.yml b/metadata-ingestion/docs/sources/dremio/dremio_recipe.yml index 9dcd4f8b337d1..d18d19da2de84 100644 --- a/metadata-ingestion/docs/sources/dremio/dremio_recipe.yml +++ b/metadata-ingestion/docs/sources/dremio/dremio_recipe.yml @@ -20,6 +20,8 @@ source: include_query_lineage: True + ingest_owner: true + #Optional source_mappings: - platform: s3 diff --git a/metadata-ingestion/docs/sources/neo4j/neo4j.md b/metadata-ingestion/docs/sources/neo4j/neo4j.md new file mode 100644 index 0000000000000..d4dab2c6c7e1f --- /dev/null +++ b/metadata-ingestion/docs/sources/neo4j/neo4j.md @@ -0,0 +1,20 @@ +## Integration Details + + + +Neo4j metadata will be ingested into DataHub using +`CALL apoc.meta.schema() YIELD value UNWIND keys(value) AS key RETURN key, value[key] AS value;` +The data that is returned will be parsed +and will be displayed as Nodes and Relationships in DataHub. Each object will be tagged with describing what kind of DataHub +object it is. The defaults are 'Node' and 'Relationship'. These tag values can be overwritten in the recipe. + + + +## Metadata Ingestion Quickstart + +### Prerequisites + +In order to ingest metadata from Neo4j, you will need: + +* Neo4j instance with APOC installed + diff --git a/metadata-ingestion/docs/sources/neo4j/neo4j_recipe.yml b/metadata-ingestion/docs/sources/neo4j/neo4j_recipe.yml new file mode 100644 index 0000000000000..463d65e7ba323 --- /dev/null +++ b/metadata-ingestion/docs/sources/neo4j/neo4j_recipe.yml @@ -0,0 +1,12 @@ +source: + type: 'neo4j' + config: + uri: 'neo4j+ssc://host:7687' + username: 'neo4j' + password: 'password' + env: 'PROD' + +sink: + type: "datahub-rest" + config: + server: 'http://localhost:8080' \ No newline at end of file diff --git a/metadata-ingestion/examples/bootstrap_data/banking_business_glossary.yaml b/metadata-ingestion/examples/bootstrap_data/banking_business_glossary.yaml index d0fea81748da5..a1adec58b7b5e 100644 --- a/metadata-ingestion/examples/bootstrap_data/banking_business_glossary.yaml +++ b/metadata-ingestion/examples/bootstrap_data/banking_business_glossary.yaml @@ -1,4 +1,4 @@ -version: 1 +version: "1" source: DataHub owners: users: @@ -68,4 +68,4 @@ nodes: - name: Auto Loan description: "A type of loan used to finance the purchase of a vehicle, with the vehicle serving as collateral for the loan." - name: Interest Rate - description: "The rate at which interest is charged on a loan or paid on an investment, expressed as a percentage of the principal amount." \ No newline at end of file + description: "The rate at which interest is charged on a loan or paid on an investment, expressed as a percentage of the principal amount." diff --git a/metadata-ingestion/examples/bootstrap_data/business_glossary.yml b/metadata-ingestion/examples/bootstrap_data/business_glossary.yml index 327246863b0ab..20d1011b96689 100644 --- a/metadata-ingestion/examples/bootstrap_data/business_glossary.yml +++ b/metadata-ingestion/examples/bootstrap_data/business_glossary.yml @@ -1,4 +1,4 @@ -version: 1 +version: "1" source: DataHub owners: users: @@ -11,20 +11,20 @@ nodes: - label: Wiki link for classification url: "https://en.wikipedia.org/wiki/Classification" custom_properties: - is_confidential: true + is_confidential: "true" terms: - name: Sensitive description: Sensitive Data custom_properties: - is_confidential: false + is_confidential: "false" - name: Confidential description: Confidential Data custom_properties: - is_confidential: true + is_confidential: "true" - name: HighlyConfidential description: Highly Confidential Data custom_properties: - is_confidential: true + is_confidential: "true" domain: Marketing - name: PersonalInformation description: All terms related to personal information @@ -72,7 +72,7 @@ nodes: - Shipping.CountryCode - Shipping.StreetAddress custom_properties: - is_used_for_compliance_tracking: true + is_used_for_compliance_tracking: "true" knowledge_links: - url: "https://en.wikipedia.org/wiki/Address" label: Wiki link diff --git a/metadata-ingestion/scripts/datahub_preflight.sh b/metadata-ingestion/scripts/datahub_preflight.sh deleted file mode 100755 index 9676964f4d49d..0000000000000 --- a/metadata-ingestion/scripts/datahub_preflight.sh +++ /dev/null @@ -1,108 +0,0 @@ -#!/bin/bash -e - -#From https://stackoverflow.com/questions/4023830/how-to-compare-two-strings-in-dot-separated-version-format-in-bash -verlte() { - [ "$1" == "$(echo -e "$1\n$2" | sort -V | head -n1)" ] -} - -brew_install() { - package=${1} - required_version=${2} - printf '\nπŸ”Ž Checking if %s installed\n' "${package}" - version=$(brew list --version|grep "$1"|awk '{ print $2 }') - - if [ -n "${version}" ]; then - if [ -n "$2" ] && ! verlte "${required_version}" "${version}"; then - printf 'πŸ”½ %s is installed but its version %s is lower than the required %s\n' "${package}" "${version}" "${required_version}. Updating version..." - brew update && brew upgrade "$1" && printf 'βœ… %s is installed\n' "${package}" - else - printf 'βœ… %s is already installed\n' "${package} with version ${version}" - fi - else - brew install "$1" && printf 'βœ… %s is installed\n' "${package}" - fi -} - -arm64_darwin_preflight() { - printf "✨ Creating/activating Virtual Environment\n" - python3 -m venv venv - source venv/bin/activate - - printf "πŸ”Ž Checking if Scipy installed\n" - if pip list | grep -F scipy; then - printf "βœ… Scipy already installed\n" - else - printf "Scipy not installed\n" - printf "β›… Installing prerequisities for scipy" - brew install openblas - OPENBLAS="$(brew --prefix openblas)" - export OPENBLAS - ##preinstall numpy and pythran from source - pip3 uninstall -y numpy pythran - pip3 install cython pybind11 - pip3 install --no-use-pep517 numpy - pip3 install pythran - pip3 install --no-use-pep517 scipy - fi - - brew_install "openssl@1.1" - brew install "postgresql@14" - - # postgresql installs libs in a strange way - # we first symlink /opt/postgresql@14 to /opt/postgresql - if [ ! -z $(brew --prefix)/opt/postgresql ]; then - printf "✨ Symlinking postgresql@14 to postgresql\n" - ln -sf $(brew --prefix postgresql@14) $(brew --prefix)/opt/postgresql - fi - # we then symlink all libs under /opt/postgresql@14/lib/postgresql@14 to /opt/postgresql@14/lib - if [ ! -z $(brew --prefix postgresql@14)/lib/postgresql@14 ]; then - printf "✨ Patching up libs in $(brew --prefix postgresql@14)/lib/postgresql@14)\n" - ln -sf $(brew --prefix postgresql@14)/lib/postgresql@14/* $(brew --prefix postgresql@14)/lib/ - fi - - printf "\e[38;2;0;255;0mβœ… Done\e[38;2;255;255;255m\n" - - printf "✨ Setting up environment variable:\n" - GRPC_PYTHON_BUILD_SYSTEM_OPENSSL=1 - export GRPC_PYTHON_BUILD_SYSTEM_OPENSSL - GRPC_PYTHON_BUILD_SYSTEM_ZLIB=1 - export GRPC_PYTHON_BUILD_SYSTEM_ZLIB - CPPFLAGS="-I$(brew --prefix openssl@1.1)/include" - export CPPFLAGS - LDFLAGS="-L$(brew --prefix openssl@1.1)/lib" - export LDFLAGS - -cat << EOF - export GRPC_PYTHON_BUILD_SYSTEM_OPENSSL=1 - export GRPC_PYTHON_BUILD_SYSTEM_ZLIB=1 - export CPPFLAGS="-I$(brew --prefix openssl@1.1)/include" - export LDFLAGS="-L$(brew --prefix openssl@1.1)/lib -L$(brew --prefix postgresql@14)/lib/postgresql@14" - -EOF - - if pip list | grep -F confluent-kafka; then - printf "βœ… confluent-kafka already installed\n" - else - pip3 install confluent-kafka - fi - - printf "✨ Setting up prerequisities\n" - # none for now, since jq was removed - - printf "\e[38;2;0;255;0mβœ… Done\e[38;2;255;255;255m\n" -} - - -printf "πŸ”Ž Checking if current directory is metadata-ingestion folder\n" -if [ "$(basename "$(pwd)")" != "metadata-ingestion" ]; then - printf "πŸ’₯ You should run this script in Datahub\'s metadata-ingestion folder but your folder is %s\n" "$(pwd)" - exit 123 -fi -printf 'βœ… Current folder is metadata-ingestion (%s) folder\n' "$(pwd)" -if [[ $(uname -m) == 'arm64' && $(uname) == 'Darwin' ]]; then - printf "πŸ‘Ÿ Running preflight for m1 mac\n" - arm64_darwin_preflight -fi - - -printf "\n\e[38;2;0;255;0mβœ… Preflight was successful\e[38;2;255;255;255m\n" diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 292038380e6a2..c6d55fb5bcc56 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -14,8 +14,8 @@ ) base_requirements = { - # Typing extension should be >=3.10.0.2 ideally but we can't restrict due to a Airflow 2.1 dependency conflict. - "typing_extensions>=3.7.4.3", + # Our min version of typing_extensions is somewhat constrained by Airflow. + "typing_extensions>=3.10.0.2", # Actual dependencies. "typing-inspect", # pydantic 1.8.2 is incompatible with mypy 0.910. @@ -525,6 +525,7 @@ "qlik-sense": sqlglot_lib | {"requests", "websocket-client"}, "sigma": sqlglot_lib | {"requests"}, "sac": sac, + "neo4j": {"pandas", "neo4j"}, } # This is mainly used to exclude plugins from the Docker image. @@ -673,6 +674,7 @@ "sigma", "sac", "cassandra", + "neo4j", ] if plugin for dependency in plugins[plugin] @@ -792,6 +794,7 @@ "sigma = datahub.ingestion.source.sigma.sigma:SigmaSource", "sac = datahub.ingestion.source.sac.sac:SACSource", "cassandra = datahub.ingestion.source.cassandra.cassandra:CassandraSource", + "neo4j = datahub.ingestion.source.neo4j.neo4j_source:Neo4jSource", ], "datahub.ingestion.transformer.plugins": [ "pattern_cleanup_ownership = datahub.ingestion.transformer.pattern_cleanup_ownership:PatternCleanUpOwnership", diff --git a/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py b/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py index 9fbb15500a863..a5eecf198a9b4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py @@ -22,6 +22,8 @@ class DatasetSubTypes(StrEnum): SAC_MODEL = "Model" SAC_IMPORT_DATA_MODEL = "Import Data Model" SAC_LIVE_DATA_MODEL = "Live Data Model" + NEO4J_NODE = "Neo4j Node" + NEO4J_RELATIONSHIP = "Neo4j Relationship" # TODO: Create separate entity... NOTEBOOK = "Notebook" diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index 4598ae388b827..499e7e1231d05 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -53,19 +53,7 @@ make_assertion_from_test, make_assertion_result_from_test, ) -from datahub.ingestion.source.sql.sql_types import ( - ATHENA_SQL_TYPES_MAP, - BIGQUERY_TYPES_MAP, - POSTGRES_TYPES_MAP, - SNOWFLAKE_TYPES_MAP, - SPARK_SQL_TYPES_MAP, - TRINO_SQL_TYPES_MAP, - VERTICA_SQL_TYPES_MAP, - resolve_athena_modified_type, - resolve_postgres_modified_type, - resolve_trino_modified_type, - resolve_vertica_modified_type, -) +from datahub.ingestion.source.sql.sql_types import resolve_sql_type from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalHandler, StaleEntityRemovalSourceReport, @@ -89,17 +77,11 @@ from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent from datahub.metadata.com.linkedin.pegasus2avro.schema import ( - BooleanTypeClass, - DateTypeClass, MySqlDDL, NullTypeClass, - NumberTypeClass, - RecordType, SchemaField, SchemaFieldDataType, SchemaMetadata, - StringTypeClass, - TimeTypeClass, ) from datahub.metadata.schema_classes import ( DataPlatformInstanceClass, @@ -804,28 +786,6 @@ def make_mapping_upstream_lineage( ) -# See https://github.com/fishtown-analytics/dbt/blob/master/core/dbt/adapters/sql/impl.py -_field_type_mapping = { - "boolean": BooleanTypeClass, - "date": DateTypeClass, - "time": TimeTypeClass, - "numeric": NumberTypeClass, - "text": StringTypeClass, - "timestamp with time zone": DateTypeClass, - "timestamp without time zone": DateTypeClass, - "integer": NumberTypeClass, - "float8": NumberTypeClass, - "struct": RecordType, - **POSTGRES_TYPES_MAP, - **SNOWFLAKE_TYPES_MAP, - **BIGQUERY_TYPES_MAP, - **SPARK_SQL_TYPES_MAP, - **TRINO_SQL_TYPES_MAP, - **ATHENA_SQL_TYPES_MAP, - **VERTICA_SQL_TYPES_MAP, -} - - def get_column_type( report: DBTSourceReport, dataset_name: str, @@ -835,24 +795,10 @@ def get_column_type( """ Maps known DBT types to datahub types """ - TypeClass: Any = _field_type_mapping.get(column_type) if column_type else None - - if TypeClass is None and column_type: - # resolve a modified type - if dbt_adapter == "trino": - TypeClass = resolve_trino_modified_type(column_type) - elif dbt_adapter == "athena": - TypeClass = resolve_athena_modified_type(column_type) - elif dbt_adapter == "postgres" or dbt_adapter == "redshift": - # Redshift uses a variant of Postgres, so we can use the same logic. - TypeClass = resolve_postgres_modified_type(column_type) - elif dbt_adapter == "vertica": - TypeClass = resolve_vertica_modified_type(column_type) - elif dbt_adapter == "snowflake": - # Snowflake types are uppercase, so we check that. - TypeClass = _field_type_mapping.get(column_type.upper()) - - # if still not found, report the warning + + TypeClass = resolve_sql_type(column_type, dbt_adapter) + + # if still not found, report a warning if TypeClass is None: if column_type: report.info( @@ -861,9 +807,9 @@ def get_column_type( context=f"{dataset_name} - {column_type}", log=False, ) - TypeClass = NullTypeClass + TypeClass = NullTypeClass() - return SchemaFieldDataType(type=TypeClass()) + return SchemaFieldDataType(type=TypeClass) @platform_name("dbt") diff --git a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_api.py b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_api.py index 7b9ccb52acbef..7f4e0f520b7a5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_api.py @@ -774,3 +774,14 @@ def process_source_and_containers(source): containers.extend(future.result()) return containers + + def get_context_for_vds(self, resource_id: str) -> str: + context_array = self.get( + url=f"/catalog/{resource_id}", + ).get("sqlContext") + if context_array: + return ".".join( + f'"{part}"' if "." in part else f"{part}" for part in context_array + ) + else: + return "" diff --git a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_aspects.py b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_aspects.py index b29fc91a25e74..d9d85edbf4f7a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_aspects.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_aspects.py @@ -142,6 +142,7 @@ def __init__( platform: str, ui_url: str, env: str, + ingest_owner: bool, domain: Optional[str] = None, platform_instance: Optional[str] = None, ): @@ -150,6 +151,7 @@ def __init__( self.env = env self.domain = domain self.ui_url = ui_url + self.ingest_owner = ingest_owner def get_container_key( self, name: Optional[str], path: Optional[List[str]] @@ -426,21 +428,23 @@ def _create_external_url(self, dataset: DremioDataset) -> str: return f'{self.ui_url}/{container_type}/{dataset_url_path}"{dataset.resource_name}"' def _create_ownership(self, dataset: DremioDataset) -> Optional[OwnershipClass]: - if not dataset.owner: - return None - owner = ( - make_user_urn(dataset.owner) - if dataset.owner_type == "USER" - else make_group_urn(dataset.owner) - ) - return OwnershipClass( - owners=[ - OwnerClass( - owner=owner, - type=OwnershipTypeClass.TECHNICAL_OWNER, - ) - ] - ) + if self.ingest_owner and dataset.owner: + owner_urn = ( + make_user_urn(dataset.owner) + if dataset.owner_type == "USER" + else make_group_urn(dataset.owner) + ) + ownership: OwnershipClass = OwnershipClass( + owners=[ + OwnerClass( + owner=owner_urn, + type=OwnershipTypeClass.TECHNICAL_OWNER, + ) + ] + ) + return ownership + + return None def _create_glossary_terms(self, entity: DremioDataset) -> GlossaryTermsClass: return GlossaryTermsClass( diff --git a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_config.py b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_config.py index d966d575c0332..b3f2107a1dfaa 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_config.py @@ -174,3 +174,8 @@ def is_profiling_enabled(self) -> bool: default=False, description="Whether to include query-based lineage information.", ) + + ingest_owner: bool = Field( + default=True, + description="Ingest Owner from source. This will override Owner info entered from UI", + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_entities.py b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_entities.py index 16774c2e4a816..b80d7b8e0f912 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_entities.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_entities.py @@ -200,6 +200,7 @@ class DremioDataset: columns: List[DremioDatasetColumn] sql_definition: Optional[str] dataset_type: DremioDatasetType + default_schema: Optional[str] owner: Optional[str] owner_type: Optional[str] created: str @@ -235,6 +236,9 @@ def __init__( if self.sql_definition: self.dataset_type = DremioDatasetType.VIEW + self.default_schema = api_operations.get_context_for_vds( + resource_id=self.resource_id + ) else: self.dataset_type = DremioDatasetType.TABLE diff --git a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py index 5b96845ec0496..f814108c37760 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py @@ -97,6 +97,7 @@ class DremioSource(StatefulIngestionSourceBase): - Ownership and Glossary Terms: - Metadata related to ownership of datasets, extracted from Dremio’s ownership model. - Glossary terms and business metadata associated with datasets, providing additional context to the data. + - Note: Ownership information will only be available for the Cloud and Enterprise editions, it will not be available for the Community edition. - Optional SQL Profiling (if enabled): - Table, row, and column statistics can be profiled and ingested via optional SQL queries. @@ -123,6 +124,7 @@ def __init__(self, config: DremioSourceConfig, ctx: PipelineContext): self.dremio_aspects = DremioAspects( platform=self.get_platform(), domain=self.config.domain, + ingest_owner=self.config.ingest_owner, platform_instance=self.config.platform_instance, env=self.config.env, ui_url=dremio_api.ui_url, @@ -415,6 +417,7 @@ def process_dataset( view_urn=dataset_urn, view_definition=dataset_info.sql_definition, default_db=self.default_db, + default_schema=dataset_info.default_schema, ) elif dataset_info.dataset_type == DremioDatasetType.TABLE: diff --git a/metadata-ingestion/src/datahub/ingestion/source/neo4j/__init__.py b/metadata-ingestion/src/datahub/ingestion/source/neo4j/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/metadata-ingestion/src/datahub/ingestion/source/neo4j/neo4j_source.py b/metadata-ingestion/src/datahub/ingestion/source/neo4j/neo4j_source.py new file mode 100644 index 0000000000000..2c9107b967e4f --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/neo4j/neo4j_source.py @@ -0,0 +1,331 @@ +import logging +import time +from dataclasses import dataclass +from typing import Any, Dict, Iterable, List, Optional, Type, Union + +import pandas as pd +from neo4j import GraphDatabase +from pydantic.fields import Field + +from datahub.configuration.source_common import EnvConfigMixin +from datahub.emitter.mce_builder import make_data_platform_urn, make_dataset_urn +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.decorators import ( + SupportStatus, + config_class, + platform_name, + support_status, +) +from datahub.ingestion.api.source import Source, SourceReport +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.common.subtypes import DatasetSubTypes +from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaFieldDataType +from datahub.metadata.schema_classes import ( + AuditStampClass, + BooleanTypeClass, + DatasetPropertiesClass, + DateTypeClass, + NullTypeClass, + NumberTypeClass, + OtherSchemaClass, + SchemaFieldClass, + SchemaMetadataClass, + StringTypeClass, + SubTypesClass, + UnionTypeClass, +) + +log = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + +_type_mapping: Dict[Union[Type, str], Type] = { + "list": UnionTypeClass, + "boolean": BooleanTypeClass, + "integer": NumberTypeClass, + "local_date_time": DateTypeClass, + "float": NumberTypeClass, + "string": StringTypeClass, + "date": DateTypeClass, + "node": StringTypeClass, + "relationship": StringTypeClass, +} + + +class Neo4jConfig(EnvConfigMixin): + username: str = Field(description="Neo4j Username") + password: str = Field(description="Neo4j Password") + uri: str = Field(description="The URI for the Neo4j server") + env: str = Field(description="Neo4j env") + + +@dataclass +class Neo4jSourceReport(SourceReport): + obj_failures: int = 0 + obj_created: int = 0 + + +@platform_name("Neo4j", id="neo4j") +@config_class(Neo4jConfig) +@support_status(SupportStatus.CERTIFIED) +class Neo4jSource(Source): + NODE = "node" + RELATIONSHIP = "relationship" + PLATFORM = "neo4j" + + def __init__(self, ctx: PipelineContext, config: Neo4jConfig): + self.ctx = ctx + self.config = config + self.report = Neo4jSourceReport() + + @classmethod + def create(cls, config_dict, ctx): + config = Neo4jConfig.parse_obj(config_dict) + return cls(ctx, config) + + def get_field_type(self, attribute_type: Union[type, str]) -> SchemaFieldDataType: + type_class: type = _type_mapping.get(attribute_type, NullTypeClass) + return SchemaFieldDataType(type=type_class()) + + def get_schema_field_class( + self, col_name: str, col_type: str, **kwargs: Any + ) -> SchemaFieldClass: + if kwargs["obj_type"] == self.NODE and col_type == self.RELATIONSHIP: + col_type = self.NODE + else: + col_type = col_type + return SchemaFieldClass( + fieldPath=col_name, + type=self.get_field_type(col_type), + nativeDataType=col_type, + description=col_type.upper() + if col_type in (self.NODE, self.RELATIONSHIP) + else col_type, + lastModified=AuditStampClass( + time=round(time.time() * 1000), actor="urn:li:corpuser:ingestion" + ), + ) + + def add_properties( + self, + dataset: str, + description: Optional[str] = None, + custom_properties: Optional[Dict[str, str]] = None, + ) -> MetadataChangeProposalWrapper: + dataset_properties = DatasetPropertiesClass( + description=description, + customProperties=custom_properties, + ) + return MetadataChangeProposalWrapper( + entityUrn=make_dataset_urn( + platform=self.PLATFORM, name=dataset, env=self.config.env + ), + aspect=dataset_properties, + ) + + def generate_neo4j_object( + self, dataset: str, columns: list, obj_type: Optional[str] = None + ) -> MetadataChangeProposalWrapper: + try: + fields = [ + self.get_schema_field_class(key, value.lower(), obj_type=obj_type) + for d in columns + for key, value in d.items() + ] + mcp = MetadataChangeProposalWrapper( + entityUrn=make_dataset_urn( + platform=self.PLATFORM, name=dataset, env=self.config.env + ), + aspect=SchemaMetadataClass( + schemaName=dataset, + platform=make_data_platform_urn(self.PLATFORM), + version=0, + hash="", + platformSchema=OtherSchemaClass(rawSchema=""), + lastModified=AuditStampClass( + time=round(time.time() * 1000), + actor="urn:li:corpuser:ingestion", + ), + fields=fields, + ), + ) + self.report.obj_created += 1 + except Exception as e: + log.error(e) + self.report.obj_failures += 1 + return mcp + + def get_neo4j_metadata(self, query: str) -> pd.DataFrame: + driver = GraphDatabase.driver( + self.config.uri, auth=(self.config.username, self.config.password) + ) + """ + This process retrieves the metadata for Neo4j objects using an APOC query, which returns a dictionary + with two columns: key and value. The key represents the Neo4j object, while the value contains the + corresponding metadata. + + When data is returned from Neo4j, much of the relationship metadata is stored with the relevant node's + metadata. Consequently, the objects are organized into two separate dataframes: one for nodes and one for + relationships. + + In the node dataframe, several fields are extracted and added as new columns. Similarly, in the relationship + dataframe, certain fields are parsed out, while others require metadata from the nodes dataframe. + + Once the data is parsed and these two dataframes are created, we combine a subset of their columns into a + single dataframe, which will be used to create the DataHub objects. + + See the docs for examples of metadata: metadata-ingestion/docs/sources/neo4j/neo4j.md + """ + try: + log.info(f"{query}") + with driver.session() as session: + result = session.run(query) + data = [record for record in result] + log.info("Closing Neo4j driver") + driver.close() + + node_df = self.process_nodes(data) + rel_df = self.process_relationships(data, node_df) + + union_cols = ["key", "obj_type", "property_data_types", "description"] + df = pd.concat([node_df[union_cols], rel_df[union_cols]]) + except Exception as e: + self.report.failure( + message="Failed to get neo4j metadata", + exc=e, + ) + + return df + + def process_nodes(self, data: list) -> pd.DataFrame: + nodes = [record for record in data if record["value"]["type"] == self.NODE] + node_df = pd.DataFrame( + nodes, + columns=["key", "value"], + ) + node_df["obj_type"] = node_df["value"].apply( + lambda record: self.get_obj_type(record) + ) + node_df["relationships"] = node_df["value"].apply( + lambda record: self.get_relationships(record) + ) + node_df["properties"] = node_df["value"].apply( + lambda record: self.get_properties(record) + ) + node_df["property_data_types"] = node_df["properties"].apply( + lambda record: self.get_property_data_types(record) + ) + node_df["description"] = node_df.apply( + lambda record: self.get_node_description(record, node_df), axis=1 + ) + return node_df + + def process_relationships(self, data: list, node_df: pd.DataFrame) -> pd.DataFrame: + rels = [ + record for record in data if record["value"]["type"] == self.RELATIONSHIP + ] + rel_df = pd.DataFrame(rels, columns=["key", "value"]) + rel_df["obj_type"] = rel_df["value"].apply( + lambda record: self.get_obj_type(record) + ) + rel_df["properties"] = rel_df["value"].apply( + lambda record: self.get_properties(record) + ) + rel_df["property_data_types"] = rel_df["properties"].apply( + lambda record: self.get_property_data_types(record) + ) + rel_df["description"] = rel_df.apply( + lambda record: self.get_rel_descriptions(record, node_df), axis=1 + ) + return rel_df + + def get_obj_type(self, record: dict) -> str: + return record["type"] + + def get_rel_descriptions(self, record: dict, df: pd.DataFrame) -> str: + descriptions = [] + for _, row in df.iterrows(): + relationships = row.get("relationships", {}) + for relationship, props in relationships.items(): + if record["key"] == relationship: + if props["direction"] == "in": + for prop in props["labels"]: + descriptions.append( + f"({row['key']})-[{record['key']}]->({prop})" + ) + return "\n".join(descriptions) + + def get_node_description(self, record: dict, df: pd.DataFrame) -> str: + descriptions = [] + for _, row in df.iterrows(): + if record["key"] == row["key"]: + for relationship, props in row["relationships"].items(): + direction = props["direction"] + for node in set(props["labels"]): + if direction == "in": + descriptions.append( + f"({row['key']})<-[{relationship}]-({node})" + ) + elif direction == "out": + descriptions.append( + f"({row['key']})-[{relationship}]->({node})" + ) + + return "\n".join(descriptions) + + def get_property_data_types(self, record: dict) -> List[dict]: + return [{k: v["type"]} for k, v in record.items()] + + def get_properties(self, record: dict) -> str: + return record["properties"] + + def get_relationships(self, record: dict) -> dict: + return record.get("relationships", None) + + def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: + df = self.get_neo4j_metadata( + "CALL apoc.meta.schema() YIELD value UNWIND keys(value) AS key RETURN key, value[key] AS value;" + ) + for index, row in df.iterrows(): + try: + yield MetadataWorkUnit( + id=row["key"], + mcp=self.generate_neo4j_object( + columns=row["property_data_types"], + dataset=row["key"], + ), + is_primary_source=True, + ) + + yield MetadataWorkUnit( + id=row["key"], + mcp=MetadataChangeProposalWrapper( + entityUrn=make_dataset_urn( + platform=self.PLATFORM, + name=row["key"], + env=self.config.env, + ), + aspect=SubTypesClass( + typeNames=[ + DatasetSubTypes.NEO4J_NODE + if row["obj_type"] == self.NODE + else DatasetSubTypes.NEO4J_RELATIONSHIP + ] + ), + ), + ) + + yield MetadataWorkUnit( + id=row["key"], + mcp=self.add_properties( + dataset=row["key"], + custom_properties=None, + description=row["description"], + ), + ) + + except Exception as e: + raise e + + def get_report(self): + return self.report diff --git a/metadata-ingestion/src/datahub/ingestion/source/qlik_sense/data_classes.py b/metadata-ingestion/src/datahub/ingestion/source/qlik_sense/data_classes.py index 672fcbceb0603..a43f5f32493f2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/qlik_sense/data_classes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/qlik_sense/data_classes.py @@ -15,6 +15,7 @@ TimeType, ) +# TODO: Replace with standardized types in sql_types.py FIELD_TYPE_MAPPING: Dict[ str, Type[ diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 4bc4c1451c262..06cbb7fbae27c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -222,6 +222,7 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): ``` """ + # TODO: Replace with standardized types in sql_types.py REDSHIFT_FIELD_TYPE_MAPPINGS: Dict[ str, Type[ diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py index d4442749a0622..2bd8e8017f549 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py @@ -103,6 +103,7 @@ logger = logging.getLogger(__name__) # https://docs.snowflake.com/en/sql-reference/intro-summary-data-types.html +# TODO: Move to the standardized types in sql_types.py SNOWFLAKE_FIELD_TYPE_MAPPINGS = { "DATE": DateType, "BIGINT": NumberType, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py index 71cfd0268ee6b..6f7decc79b1df 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py @@ -26,6 +26,7 @@ platform_name, support_status, ) +from datahub.ingestion.api.source import StructuredLogLevel from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.aws.s3_util import make_s3_urn from datahub.ingestion.source.common.subtypes import DatasetContainerSubTypes @@ -35,6 +36,7 @@ register_custom_type, ) from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, make_sqlalchemy_uri +from datahub.ingestion.source.sql.sql_report import SQLSourceReport from datahub.ingestion.source.sql.sql_utils import ( add_table_to_schema_container, gen_database_container, @@ -48,6 +50,15 @@ get_schema_fields_for_sqlalchemy_column, ) +try: + from typing_extensions import override +except ImportError: + _F = typing.TypeVar("_F", bound=typing.Callable[..., typing.Any]) + + def override(f: _F, /) -> _F: # noqa: F811 + return f + + logger = logging.getLogger(__name__) assert STRUCT, "required type modules are not available" @@ -322,12 +333,15 @@ class AthenaSource(SQLAlchemySource): - Profiling when enabled. """ - table_partition_cache: Dict[str, Dict[str, Partitionitem]] = {} + config: AthenaConfig + report: SQLSourceReport def __init__(self, config, ctx): super().__init__(config, ctx, "athena") self.cursor: Optional[BaseCursor] = None + self.table_partition_cache: Dict[str, Dict[str, Partitionitem]] = {} + @classmethod def create(cls, config_dict, ctx): config = AthenaConfig.parse_obj(config_dict) @@ -452,6 +466,7 @@ def add_table_to_schema_container( ) # It seems like database/schema filter in the connection string does not work and this to work around that + @override def get_schema_names(self, inspector: Inspector) -> List[str]: athena_config = typing.cast(AthenaConfig, self.config) schemas = inspector.get_schema_names() @@ -459,34 +474,42 @@ def get_schema_names(self, inspector: Inspector) -> List[str]: return [schema for schema in schemas if schema == athena_config.database] return schemas - # Overwrite to get partitions + @classmethod + def _casted_partition_key(cls, key: str) -> str: + # We need to cast the partition keys to a VARCHAR, since otherwise + # Athena may throw an error during concatenation / comparison. + return f"CAST({key} as VARCHAR)" + + @override def get_partitions( self, inspector: Inspector, schema: str, table: str - ) -> List[str]: - partitions = [] - - athena_config = typing.cast(AthenaConfig, self.config) - - if not athena_config.extract_partitions: - return [] + ) -> Optional[List[str]]: + if not self.config.extract_partitions: + return None if not self.cursor: - return [] + return None metadata: AthenaTableMetadata = self.cursor.get_table_metadata( table_name=table, schema_name=schema ) - if metadata.partition_keys: - for key in metadata.partition_keys: - if key.name: - partitions.append(key.name) - - if not partitions: - return [] + partitions = [] + for key in metadata.partition_keys: + if key.name: + partitions.append(key.name) + if not partitions: + return [] - # We create an artiificaial concatenated partition key to be able to query max partition easier - part_concat = "|| '-' ||".join(partitions) + with self.report.report_exc( + message="Failed to extract partition details", + context=f"{schema}.{table}", + level=StructuredLogLevel.WARN, + ): + # We create an artifical concatenated partition key to be able to query max partition easier + part_concat = " || '-' || ".join( + self._casted_partition_key(key) for key in partitions + ) max_partition_query = f'select {",".join(partitions)} from "{schema}"."{table}$partitions" where {part_concat} = (select max({part_concat}) from "{schema}"."{table}$partitions")' ret = self.cursor.execute(max_partition_query) max_partition: Dict[str, str] = {} @@ -500,9 +523,8 @@ def get_partitions( partitions=partitions, max_partition=max_partition, ) - return partitions - return [] + return partitions # Overwrite to modify the creation of schema fields def get_schema_fields_for_column( @@ -551,7 +573,9 @@ def generate_partition_profiler_query( if partition and partition.max_partition: max_partition_filters = [] for key, value in partition.max_partition.items(): - max_partition_filters.append(f"CAST({key} as VARCHAR) = '{value}'") + max_partition_filters.append( + f"{self._casted_partition_key(key)} = '{value}'" + ) max_partition = str(partition.max_partition) return ( max_partition, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py index 8ea4209784063..89ca160ba1f48 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py @@ -1,5 +1,5 @@ import re -from typing import Any, Dict, ValuesView +from typing import Any, Dict, Optional, Type, Union, ValuesView from datahub.metadata.com.linkedin.pegasus2avro.schema import ( ArrayType, @@ -16,14 +16,28 @@ UnionType, ) -# these can be obtained by running `select format_type(oid, null),* from pg_type;` -# we've omitted the types without a meaningful DataHub type (e.g. postgres-specific types, index vectors, etc.) -# (run `\copy (select format_type(oid, null),* from pg_type) to 'pg_type.csv' csv header;` to get a CSV) +DATAHUB_FIELD_TYPE = Union[ + ArrayType, + BooleanType, + BytesType, + DateType, + EnumType, + MapType, + NullType, + NumberType, + RecordType, + StringType, + TimeType, + UnionType, +] -# we map from format_type since this is what dbt uses -# see https://github.com/fishtown-analytics/dbt/blob/master/plugins/postgres/dbt/include/postgres/macros/catalog.sql#L22 -# see https://www.npgsql.org/dev/types.html for helpful type annotations +# These can be obtained by running `select format_type(oid, null),* from pg_type;` +# We've omitted the types without a meaningful DataHub type (e.g. postgres-specific types, index vectors, etc.) +# (run `\copy (select format_type(oid, null),* from pg_type) to 'pg_type.csv' csv header;` to get a CSV) +# We map from format_type since this is what dbt uses. +# See https://github.com/fishtown-analytics/dbt/blob/master/plugins/postgres/dbt/include/postgres/macros/catalog.sql#L22 +# See https://www.npgsql.org/dev/types.html for helpful type annotations POSTGRES_TYPES_MAP: Dict[str, Any] = { "boolean": BooleanType, "bytea": BytesType, @@ -430,3 +444,54 @@ def resolve_vertica_modified_type(type_string: str) -> Any: "geography": None, "uuid": StringType, } + + +_merged_mapping = { + "boolean": BooleanType, + "date": DateType, + "time": TimeType, + "numeric": NumberType, + "text": StringType, + "timestamp with time zone": DateType, + "timestamp without time zone": DateType, + "integer": NumberType, + "float8": NumberType, + "struct": RecordType, + **POSTGRES_TYPES_MAP, + **SNOWFLAKE_TYPES_MAP, + **BIGQUERY_TYPES_MAP, + **SPARK_SQL_TYPES_MAP, + **TRINO_SQL_TYPES_MAP, + **ATHENA_SQL_TYPES_MAP, + **VERTICA_SQL_TYPES_MAP, +} + + +def resolve_sql_type( + column_type: Optional[str], + platform: Optional[str] = None, +) -> Optional[DATAHUB_FIELD_TYPE]: + # In theory, we should use the platform-specific mapping where available. + # However, the types don't ever conflict, so the merged mapping is fine. + TypeClass: Optional[Type[DATAHUB_FIELD_TYPE]] = ( + _merged_mapping.get(column_type) if column_type else None + ) + + if TypeClass is None and column_type: + # resolve a modified type + if platform == "trino": + TypeClass = resolve_trino_modified_type(column_type) + elif platform == "athena": + TypeClass = resolve_athena_modified_type(column_type) + elif platform == "postgres" or platform == "redshift": + # Redshift uses a variant of Postgres, so we can use the same logic. + TypeClass = resolve_postgres_modified_type(column_type) + elif platform == "vertica": + TypeClass = resolve_vertica_modified_type(column_type) + elif platform == "snowflake": + # Snowflake types are uppercase, so we check that. + TypeClass = _merged_mapping.get(column_type.upper()) + + if TypeClass: + return TypeClass() + return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py index f84f6c1b0c08d..9c5752c518df1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py @@ -33,6 +33,7 @@ logger = logging.getLogger(__name__) +# TODO: (maybe) Replace with standardized types in sql_types.py DATA_TYPE_REGISTRY: dict = { ColumnTypeName.BOOLEAN: BooleanTypeClass, ColumnTypeName.BYTE: BytesTypeClass, diff --git a/metadata-ingestion/src/datahub/utilities/urn_encoder.py b/metadata-ingestion/src/datahub/utilities/urn_encoder.py index 88c0a128b8e46..4f19eeff3e70f 100644 --- a/metadata-ingestion/src/datahub/utilities/urn_encoder.py +++ b/metadata-ingestion/src/datahub/utilities/urn_encoder.py @@ -4,7 +4,8 @@ # NOTE: Frontend relies on encoding these three characters. Specifically, we decode and encode schema fields for column level lineage. # If this changes, make appropriate changes to datahub-web-react/src/app/lineage/utils/columnLineageUtils.ts # We also rely on encoding these exact three characters when generating schemaField urns in our graphQL layer. Update SchemaFieldUtils if this changes. -RESERVED_CHARS = {",", "(", ")"} +# Also see https://datahubproject.io/docs/what/urn/#restrictions +RESERVED_CHARS = {",", "(", ")", "␟"} RESERVED_CHARS_EXTENDED = RESERVED_CHARS.union({"%"}) diff --git a/metadata-ingestion/tests/integration/dbt/test_dbt.py b/metadata-ingestion/tests/integration/dbt/test_dbt.py index 390d8d7698dd4..c6a3dc4fd590b 100644 --- a/metadata-ingestion/tests/integration/dbt/test_dbt.py +++ b/metadata-ingestion/tests/integration/dbt/test_dbt.py @@ -11,12 +11,6 @@ from datahub.ingestion.run.pipeline_config import PipelineConfig, SourceConfig from datahub.ingestion.source.dbt.dbt_common import DBTEntitiesEnabled, EmitDirective from datahub.ingestion.source.dbt.dbt_core import DBTCoreConfig, DBTCoreSource -from datahub.ingestion.source.sql.sql_types import ( - ATHENA_SQL_TYPES_MAP, - TRINO_SQL_TYPES_MAP, - resolve_athena_modified_type, - resolve_trino_modified_type, -) from tests.test_helpers import mce_helpers, test_connection_helpers FROZEN_TIME = "2022-02-03 07:00:00" @@ -362,69 +356,6 @@ def test_dbt_tests(test_resources_dir, pytestconfig, tmp_path, mock_time, **kwar ) -@pytest.mark.parametrize( - "data_type, expected_data_type", - [ - ("boolean", "boolean"), - ("tinyint", "tinyint"), - ("smallint", "smallint"), - ("int", "int"), - ("integer", "integer"), - ("bigint", "bigint"), - ("real", "real"), - ("double", "double"), - ("decimal(10,0)", "decimal"), - ("varchar(20)", "varchar"), - ("char", "char"), - ("varbinary", "varbinary"), - ("json", "json"), - ("date", "date"), - ("time", "time"), - ("time(12)", "time"), - ("timestamp", "timestamp"), - ("timestamp(3)", "timestamp"), - ("row(x bigint, y double)", "row"), - ("array(row(x bigint, y double))", "array"), - ("map(varchar, varchar)", "map"), - ], -) -def test_resolve_trino_modified_type(data_type, expected_data_type): - assert ( - resolve_trino_modified_type(data_type) - == TRINO_SQL_TYPES_MAP[expected_data_type] - ) - - -@pytest.mark.parametrize( - "data_type, expected_data_type", - [ - ("boolean", "boolean"), - ("tinyint", "tinyint"), - ("smallint", "smallint"), - ("int", "int"), - ("integer", "integer"), - ("bigint", "bigint"), - ("float", "float"), - ("double", "double"), - ("decimal(10,0)", "decimal"), - ("varchar(20)", "varchar"), - ("char", "char"), - ("binary", "binary"), - ("date", "date"), - ("timestamp", "timestamp"), - ("timestamp(3)", "timestamp"), - ("struct", "struct"), - ("array>", "array"), - ("map", "map"), - ], -) -def test_resolve_athena_modified_type(data_type, expected_data_type): - assert ( - resolve_athena_modified_type(data_type) - == ATHENA_SQL_TYPES_MAP[expected_data_type] - ) - - @pytest.mark.integration @freeze_time(FROZEN_TIME) def test_dbt_tests_only_assertions( diff --git a/metadata-ingestion/tests/unit/test_athena_source.py b/metadata-ingestion/tests/unit/test_athena_source.py index 875cf3800daf8..f8b6220d18273 100644 --- a/metadata-ingestion/tests/unit/test_athena_source.py +++ b/metadata-ingestion/tests/unit/test_athena_source.py @@ -93,7 +93,8 @@ def test_athena_get_table_properties(): "CreateTime": datetime.now(), "LastAccessTime": datetime.now(), "PartitionKeys": [ - {"Name": "testKey", "Type": "string", "Comment": "testComment"} + {"Name": "year", "Type": "string", "Comment": "testComment"}, + {"Name": "month", "Type": "string", "Comment": "testComment"}, ], "Parameters": { "comment": "testComment", @@ -112,8 +113,18 @@ def test_athena_get_table_properties(): response=table_metadata ) + # Mock partition query results + mock_cursor.execute.return_value.description = [ + ["year"], + ["month"], + ] + mock_cursor.execute.return_value.__iter__.return_value = [["2023", "12"]] + ctx = PipelineContext(run_id="test") source = AthenaSource(config=config, ctx=ctx) + source.cursor = mock_cursor + + # Test table properties description, custom_properties, location = source.get_table_properties( inspector=mock_inspector, table=table, schema=schema ) @@ -124,13 +135,35 @@ def test_athena_get_table_properties(): "last_access_time": "2020-04-14 07:00:00", "location": "s3://testLocation", "outputformat": "testOutputFormat", - "partition_keys": '[{"name": "testKey", "type": "string", "comment": "testComment"}]', + "partition_keys": '[{"name": "year", "type": "string", "comment": "testComment"}, {"name": "month", "type": "string", "comment": "testComment"}]', "serde.serialization.lib": "testSerde", "table_type": "testType", } - assert location == make_s3_urn("s3://testLocation", "PROD") + # Test partition functionality + partitions = source.get_partitions( + inspector=mock_inspector, schema=schema, table=table + ) + assert partitions == ["year", "month"] + + # Verify the correct SQL query was generated for partitions + expected_query = """\ +select year,month from "test_schema"."test_table$partitions" \ +where CAST(year as VARCHAR) || '-' || CAST(month as VARCHAR) = \ +(select max(CAST(year as VARCHAR) || '-' || CAST(month as VARCHAR)) \ +from "test_schema"."test_table$partitions")""" + mock_cursor.execute.assert_called_once() + actual_query = mock_cursor.execute.call_args[0][0] + assert actual_query == expected_query + + # Verify partition cache was populated correctly + assert source.table_partition_cache[schema][table].partitions == partitions + assert source.table_partition_cache[schema][table].max_partition == { + "year": "2023", + "month": "12", + } + def test_get_column_type_simple_types(): assert isinstance( @@ -214,3 +247,9 @@ def test_column_type_complex_combination(): assert isinstance( result._STRUCT_fields[2][1].item_type._STRUCT_fields[1][1], types.String ) + + +def test_casted_partition_key(): + from datahub.ingestion.source.sql.athena import AthenaSource + + assert AthenaSource._casted_partition_key("test_col") == "CAST(test_col as VARCHAR)" diff --git a/metadata-ingestion/tests/unit/test_neo4j_source.py b/metadata-ingestion/tests/unit/test_neo4j_source.py new file mode 100644 index 0000000000000..62586718e8606 --- /dev/null +++ b/metadata-ingestion/tests/unit/test_neo4j_source.py @@ -0,0 +1,221 @@ +import unittest +from pathlib import Path + +import pandas as pd +import pytest + +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.neo4j.neo4j_source import Neo4jConfig, Neo4jSource + + +@pytest.fixture +def tracking_uri(tmp_path: Path) -> str: + # return str(tmp_path / "neo4j") + return "neo4j+ssc://host:7687" + + +@pytest.fixture +def source(tracking_uri: str) -> Neo4jSource: + return Neo4jSource( + ctx=PipelineContext(run_id="neo4j-test"), + config=Neo4jConfig( + uri=tracking_uri, env="Prod", username="test", password="test" + ), + ) + + +def data(): + return [ + { + "key": "Node_1", + "value": { + "count": 433026, + "relationships": { + "RELATIONSHIP_1": { + "count": 1, + "properties": { + "Relationship1_Property1": { + "existence": False, + "type": "STRING", + "indexed": False, + "array": False, + } + }, + "direction": "in", + "labels": ["Node_2"], + } + }, + "RELATIONSHIP_2": { + "count": 2, + "properties": { + "Relationship2_Property1": { + "existence": False, + "type": "STRING", + "indexed": False, + "array": False, + } + }, + "direction": "in", + "labels": ["Node_3"], + }, + "type": "node", + "properties": { + "Node1_Property1": { + "existence": False, + "type": "DATE", + "indexed": False, + "unique": False, + }, + "Node1_Property2": { + "existence": False, + "type": "STRING", + "indexed": False, + "unique": False, + }, + "Node1_Property3": { + "existence": False, + "type": "STRING", + "indexed": False, + "unique": False, + }, + }, + "labels": [], + }, + }, + { + "key": "Node_2", + "value": { + "count": 3, + "relationships": { + "RELATIONSHIP_1": { + "count": 1, + "properties": { + "Relationship1_Property1": { + "existence": False, + "type": "STRING", + "indexed": False, + "array": False, + } + }, + "direction": "out", + "labels": ["Node_2"], + } + }, + "type": "node", + "properties": { + "Node2_Property1": { + "existence": False, + "type": "DATE", + "indexed": False, + "unique": False, + }, + "Node2_Property2": { + "existence": False, + "type": "STRING", + "indexed": False, + "unique": False, + }, + "Node2_Property3": { + "existence": False, + "type": "STRING", + "indexed": False, + "unique": False, + }, + }, + "labels": [], + }, + }, + { + "key": "RELATIONSHIP_1", + "value": { + "count": 4, + "type": "relationship", + "properties": { + "Relationship1_Property1": { + "existence": False, + "type": "STRING", + "indexed": False, + "array": False, + } + }, + }, + }, + ] + + +def test_process_nodes(source): + df = source.process_nodes(data=data()) + assert type(df) is pd.DataFrame + + +def test_process_relationships(source): + df = source.process_relationships( + data=data(), node_df=source.process_nodes(data=data()) + ) + assert type(df) is pd.DataFrame + + +def test_get_obj_type(source): + results = data() + assert source.get_obj_type(results[0]["value"]) == "node" + assert source.get_obj_type(results[1]["value"]) == "node" + assert source.get_obj_type(results[2]["value"]) == "relationship" + + +def test_get_node_description(source): + results = data() + df = source.process_nodes(data=data()) + assert ( + source.get_node_description(results[0], df) + == "(Node_1)<-[RELATIONSHIP_1]-(Node_2)" + ) + assert ( + source.get_node_description(results[1], df) + == "(Node_2)-[RELATIONSHIP_1]->(Node_2)" + ) + + +def test_get_property_data_types(source): + results = data() + assert source.get_property_data_types(results[0]["value"]["properties"]) == [ + {"Node1_Property1": "DATE"}, + {"Node1_Property2": "STRING"}, + {"Node1_Property3": "STRING"}, + ] + assert source.get_property_data_types(results[1]["value"]["properties"]) == [ + {"Node2_Property1": "DATE"}, + {"Node2_Property2": "STRING"}, + {"Node2_Property3": "STRING"}, + ] + assert source.get_property_data_types(results[2]["value"]["properties"]) == [ + {"Relationship1_Property1": "STRING"} + ] + + +def test_get_properties(source): + results = data() + assert list(source.get_properties(results[0]["value"]).keys()) == [ + "Node1_Property1", + "Node1_Property2", + "Node1_Property3", + ] + assert list(source.get_properties(results[1]["value"]).keys()) == [ + "Node2_Property1", + "Node2_Property2", + "Node2_Property3", + ] + assert list(source.get_properties(results[2]["value"]).keys()) == [ + "Relationship1_Property1" + ] + + +def test_get_relationships(source): + results = data() + record = list( + results[0]["value"]["relationships"].keys() + ) # Get the first key from the dict_keys + assert record == ["RELATIONSHIP_1"] + + +if __name__ == "__main__": + unittest.main() diff --git a/metadata-ingestion/tests/unit/test_sql_types.py b/metadata-ingestion/tests/unit/test_sql_types.py new file mode 100644 index 0000000000000..ebe5ade115cdd --- /dev/null +++ b/metadata-ingestion/tests/unit/test_sql_types.py @@ -0,0 +1,78 @@ +import pytest + +from datahub.ingestion.source.sql.sql_types import ( + ATHENA_SQL_TYPES_MAP, + TRINO_SQL_TYPES_MAP, + resolve_athena_modified_type, + resolve_sql_type, + resolve_trino_modified_type, +) +from datahub.metadata.schema_classes import BooleanTypeClass, StringTypeClass + + +@pytest.mark.parametrize( + "data_type, expected_data_type", + [ + ("boolean", "boolean"), + ("tinyint", "tinyint"), + ("smallint", "smallint"), + ("int", "int"), + ("integer", "integer"), + ("bigint", "bigint"), + ("real", "real"), + ("double", "double"), + ("decimal(10,0)", "decimal"), + ("varchar(20)", "varchar"), + ("char", "char"), + ("varbinary", "varbinary"), + ("json", "json"), + ("date", "date"), + ("time", "time"), + ("time(12)", "time"), + ("timestamp", "timestamp"), + ("timestamp(3)", "timestamp"), + ("row(x bigint, y double)", "row"), + ("array(row(x bigint, y double))", "array"), + ("map(varchar, varchar)", "map"), + ], +) +def test_resolve_trino_modified_type(data_type, expected_data_type): + assert ( + resolve_trino_modified_type(data_type) + == TRINO_SQL_TYPES_MAP[expected_data_type] + ) + + +@pytest.mark.parametrize( + "data_type, expected_data_type", + [ + ("boolean", "boolean"), + ("tinyint", "tinyint"), + ("smallint", "smallint"), + ("int", "int"), + ("integer", "integer"), + ("bigint", "bigint"), + ("float", "float"), + ("double", "double"), + ("decimal(10,0)", "decimal"), + ("varchar(20)", "varchar"), + ("char", "char"), + ("binary", "binary"), + ("date", "date"), + ("timestamp", "timestamp"), + ("timestamp(3)", "timestamp"), + ("struct", "struct"), + ("array>", "array"), + ("map", "map"), + ], +) +def test_resolve_athena_modified_type(data_type, expected_data_type): + assert ( + resolve_athena_modified_type(data_type) + == ATHENA_SQL_TYPES_MAP[expected_data_type] + ) + + +def test_resolve_sql_type() -> None: + assert resolve_sql_type("boolean") == BooleanTypeClass() + assert resolve_sql_type("varchar") == StringTypeClass() diff --git a/metadata-ingestion/tests/unit/urns/test_urn.py b/metadata-ingestion/tests/unit/urns/test_urn.py index 1bf48082fec8c..73badb3d1b423 100644 --- a/metadata-ingestion/tests/unit/urns/test_urn.py +++ b/metadata-ingestion/tests/unit/urns/test_urn.py @@ -1,6 +1,12 @@ import pytest -from datahub.metadata.urns import DatasetUrn, Urn +from datahub.metadata.urns import ( + CorpUserUrn, + DashboardUrn, + DataPlatformUrn, + DatasetUrn, + Urn, +) from datahub.utilities.urns.error import InvalidUrnError pytestmark = pytest.mark.filterwarnings("ignore::DeprecationWarning") @@ -36,20 +42,51 @@ def test_url_encode_urn() -> None: def test_invalid_urn() -> None: with pytest.raises(InvalidUrnError): - Urn.create_from_string("urn:li:abc") + Urn.from_string("urn:li:abc") with pytest.raises(InvalidUrnError): - Urn.create_from_string("urn:li:abc:") + Urn.from_string("urn:li:abc:") with pytest.raises(InvalidUrnError): - Urn.create_from_string("urn:li:abc:()") + Urn.from_string("urn:li:abc:()") with pytest.raises(InvalidUrnError): - Urn.create_from_string("urn:li:abc:(abc,)") + Urn.from_string("urn:li:abc:(abc,)") + + with pytest.raises(InvalidUrnError): + Urn.from_string("urn:li:corpuser:abc)") + + +def test_urn_colon() -> None: + # Colon characters are valid in urns, and should not mess up parsing. + + urn = Urn.from_string( + "urn:li:dashboard:(looker,dashboards.thelook::customer_lookup)" + ) + assert isinstance(urn, DashboardUrn) + + assert DataPlatformUrn.from_string("urn:li:dataPlatform:abc:def") + assert DatasetUrn.from_string( + "urn:li:dataset:(urn:li:dataPlatform:abc:def,table_name,PROD)" + ) + assert Urn.from_string("urn:li:corpuser:foo:bar@example.com") + + # I'm not sure why you'd ever want this, but technically it's a valid urn. + urn = Urn.from_string("urn:li:corpuser::") + assert isinstance(urn, CorpUserUrn) + assert urn.username == ":" + assert urn == CorpUserUrn(":") + + +def test_urn_coercion() -> None: + urn = CorpUserUrn("foo␟bar") + assert urn.urn() == "urn:li:corpuser:foo%E2%90%9Fbar" + + assert urn == Urn.from_string(urn.urn()) def test_urn_type_dispatch() -> None: - urn = Urn.from_string("urn:li:dataset:(urn:li:dataPlatform:abc,def,prod)") + urn = Urn.from_string("urn:li:dataset:(urn:li:dataPlatform:abc,def,PROD)") assert isinstance(urn, DatasetUrn) with pytest.raises(InvalidUrnError, match="Passed an urn of type corpuser"): diff --git a/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml b/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml index 1625df4a99540..0b3d815c71098 100644 --- a/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml +++ b/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml @@ -727,3 +727,14 @@ displayName: Cassandra type: KEY_VALUE_STORE logoUrl: "/assets/platforms/cassandralogo.png" +- entityUrn: urn:li:dataPlatform:neo4j + entityType: dataPlatform + aspectName: dataPlatformInfo + changeType: UPSERT + aspect: + datasetNameDelimiter: "." + name: neo4j + displayName: Neo4j + type: OTHERS + logoUrl: "/assets/platforms/neo4j.png" +