diff --git a/.github/workflows/airflow-plugin.yml b/.github/workflows/airflow-plugin.yml
index 66a08dc63aa0de..1fdfc52857b011 100644
--- a/.github/workflows/airflow-plugin.yml
+++ b/.github/workflows/airflow-plugin.yml
@@ -34,29 +34,21 @@ jobs:
include:
# Note: this should be kept in sync with tox.ini.
- python-version: "3.8"
- extra_pip_requirements: "apache-airflow~=2.1.4"
- extra_pip_extras: plugin-v1
- - python-version: "3.8"
- extra_pip_requirements: "apache-airflow~=2.2.4"
- extra_pip_extras: plugin-v1
+ extra_pip_requirements: "apache-airflow~=2.3.4"
+ extra_pip_extras: test-airflow23
- python-version: "3.10"
extra_pip_requirements: "apache-airflow~=2.4.3"
- extra_pip_extras: plugin-v2,test-airflow24
+ extra_pip_extras: test-airflow24
- python-version: "3.10"
extra_pip_requirements: "apache-airflow~=2.6.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.10.txt"
- extra_pip_extras: plugin-v2
- python-version: "3.10"
extra_pip_requirements: "apache-airflow~=2.7.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt"
- extra_pip_extras: plugin-v2
- python-version: "3.10"
extra_pip_requirements: "apache-airflow~=2.8.1 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.10.txt"
- extra_pip_extras: plugin-v2
- python-version: "3.11"
extra_pip_requirements: "apache-airflow~=2.9.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.9.3/constraints-3.11.txt"
- extra_pip_extras: plugin-v2
- python-version: "3.11"
- extra_pip_requirements: "apache-airflow~=2.10.2 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.11.txt"
- extra_pip_extras: plugin-v2
+ extra_pip_requirements: "apache-airflow~=2.10.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.10.3/constraints-3.11.txt"
fail-fast: false
steps:
- name: Set up JDK 17
diff --git a/datahub-web-react/src/app/entity/shared/containers/profile/sidebar/Container/ContainerSelectModal.tsx b/datahub-web-react/src/app/entity/shared/containers/profile/sidebar/Container/ContainerSelectModal.tsx
index 681f89831b92c4..818d75c37696dc 100644
--- a/datahub-web-react/src/app/entity/shared/containers/profile/sidebar/Container/ContainerSelectModal.tsx
+++ b/datahub-web-react/src/app/entity/shared/containers/profile/sidebar/Container/ContainerSelectModal.tsx
@@ -5,6 +5,8 @@ import { useGetSearchResultsLazyQuery } from '../../../../../../../graphql/searc
import { Container, Entity, EntityType } from '../../../../../../../types.generated';
import { useEnterKeyListener } from '../../../../../../shared/useEnterKeyListener';
import { useEntityRegistry } from '../../../../../../useEntityRegistry';
+import { getParentEntities } from '../../../../../../search/filters/utils';
+import ParentEntities from '../../../../../../search/filters/ParentEntities';
type Props = {
onCloseModal: () => void;
@@ -26,7 +28,7 @@ const StyleTag = styled(Tag)`
align-items: center;
`;
-const PreviewImage = styled.img`
+export const PreviewImage = styled.img`
max-height: 18px;
width: auto;
object-fit: contain;
@@ -34,6 +36,10 @@ const PreviewImage = styled.img`
margin-right: 4px;
`;
+export const ParentWrapper = styled.div`
+ max-width: 400px;
+`;
+
export const ContainerSelectModal = ({ onCloseModal, defaultValues, onOkOverride, titleOverride }: Props) => {
const [containerSearch, { data: platforSearchData }] = useGetSearchResultsLazyQuery();
const entityRegistry = useEntityRegistry();
@@ -65,10 +71,16 @@ export const ContainerSelectModal = ({ onCloseModal, defaultValues, onOkOverride
// Renders a search result in the select dropdown.
const renderSearchResult = (entity: Container) => {
const displayName = entityRegistry.getDisplayName(EntityType.Container, entity);
+ const parentEntities: Entity[] = getParentEntities(entity as Entity) || [];
const truncatedDisplayName = displayName.length > 25 ? `${displayName.slice(0, 25)}...` : displayName;
return (
+ {parentEntities.length > 0 && (
+
+
+
+ )}
{truncatedDisplayName}
diff --git a/datahub-web-react/src/app/search/SearchFilterLabel.tsx b/datahub-web-react/src/app/search/SearchFilterLabel.tsx
index 5a0e75cc2ae1ce..2ed2d4608de0d2 100644
--- a/datahub-web-react/src/app/search/SearchFilterLabel.tsx
+++ b/datahub-web-react/src/app/search/SearchFilterLabel.tsx
@@ -23,6 +23,9 @@ import CustomAvatar from '../shared/avatar/CustomAvatar';
import { IconStyleType } from '../entity/Entity';
import { formatNumber } from '../shared/formatNumber';
import useGetBrowseV2LabelOverride from './filters/useGetBrowseV2LabelOverride';
+import { getParentEntities } from './filters/utils';
+import { ParentWrapper } from '../entity/shared/containers/profile/sidebar/Container/ContainerSelectModal';
+import ParentEntities from './filters/ParentEntities';
type Props = {
field: string;
@@ -157,11 +160,17 @@ export const SearchFilterLabel = ({ field, value, entity, count, hideCount }: Pr
if (entity?.type === EntityType.Container) {
const container = entity as Container;
const displayName = entityRegistry.getDisplayName(EntityType.Container, container);
+ const parentEntities: Entity[] = getParentEntities(container as Entity) || [];
const truncatedDisplayName = displayName.length > 25 ? `${displayName.slice(0, 25)}...` : displayName;
return (
{!!container.platform?.properties?.logoUrl && (
-
+ <>
+
+
+
+
+ >
)}
{truncatedDisplayName}
diff --git a/datahub-web-react/src/app/search/SimpleSearchFilters.tsx b/datahub-web-react/src/app/search/SimpleSearchFilters.tsx
index 416b04403723f5..877efb55fcf828 100644
--- a/datahub-web-react/src/app/search/SimpleSearchFilters.tsx
+++ b/datahub-web-react/src/app/search/SimpleSearchFilters.tsx
@@ -4,7 +4,12 @@ import { FacetFilterInput, FacetMetadata } from '../../types.generated';
import { FilterScenarioType } from './filters/render/types';
import { useFilterRendererRegistry } from './filters/render/useFilterRenderer';
import { SimpleSearchFilter } from './SimpleSearchFilter';
-import { ENTITY_FILTER_NAME, ENTITY_INDEX_FILTER_NAME, LEGACY_ENTITY_FILTER_NAME } from './utils/constants';
+import {
+ DEGREE_FILTER_NAME,
+ ENTITY_FILTER_NAME,
+ ENTITY_INDEX_FILTER_NAME,
+ LEGACY_ENTITY_FILTER_NAME,
+} from './utils/constants';
const TOP_FILTERS = ['degree', ENTITY_FILTER_NAME, 'platform', 'tags', 'glossaryTerms', 'domains', 'owners'];
@@ -43,6 +48,15 @@ export const SimpleSearchFilters = ({ facets, selectedFilters, onFilterSelect, l
: filter,
)
.filter((filter) => filter.field !== field || !(filter.values?.length === 0));
+
+ // Do not let user unselect all degree filters
+ if (field === DEGREE_FILTER_NAME && !selected) {
+ const hasDegreeFilter = newFilters.find((filter) => filter.field === DEGREE_FILTER_NAME);
+ if (!hasDegreeFilter) {
+ return;
+ }
+ }
+
setCachedProps({ ...cachedProps, selectedFilters: newFilters });
onFilterSelect(newFilters);
};
diff --git a/datahub-web-react/src/app/search/filters/FilterOption.tsx b/datahub-web-react/src/app/search/filters/FilterOption.tsx
index 3749f44cbf6718..50b78c7f0685c9 100644
--- a/datahub-web-react/src/app/search/filters/FilterOption.tsx
+++ b/datahub-web-react/src/app/search/filters/FilterOption.tsx
@@ -8,6 +8,7 @@ import { generateColor } from '../../entity/shared/components/styled/StyledTag';
import { ANTD_GRAY } from '../../entity/shared/constants';
import { useEntityRegistry } from '../../useEntityRegistry';
import {
+ CONTAINER_FILTER_NAME,
ENTITY_SUB_TYPE_FILTER_NAME,
MAX_COUNT_VAL,
PLATFORM_FILTER_NAME,
@@ -125,7 +126,7 @@ export default function FilterOption({
const { field, value, count, entity } = filterOption;
const entityRegistry = useEntityRegistry();
const { icon, label } = getFilterIconAndLabel(field, value, entityRegistry, entity || null, 14);
- const shouldShowIcon = field === PLATFORM_FILTER_NAME && icon !== null;
+ const shouldShowIcon = (field === PLATFORM_FILTER_NAME || field === CONTAINER_FILTER_NAME) && icon !== null;
const shouldShowTagColor = field === TAGS_FILTER_NAME && entity?.type === EntityType.Tag;
const isSubTypeFilter = field === TYPE_NAMES_FILTER_NAME;
const parentEntities: Entity[] = getParentEntities(entity as Entity) || [];
diff --git a/datahub-web-react/src/app/search/filters/utils.tsx b/datahub-web-react/src/app/search/filters/utils.tsx
index f115277a049674..bd747777d11175 100644
--- a/datahub-web-react/src/app/search/filters/utils.tsx
+++ b/datahub-web-react/src/app/search/filters/utils.tsx
@@ -20,6 +20,7 @@ import {
FacetFilterInput,
FacetMetadata,
GlossaryTerm,
+ Container,
} from '../../../types.generated';
import { IconStyleType } from '../../entity/Entity';
import {
@@ -186,6 +187,15 @@ export function getFilterIconAndLabel(
entityRegistry.getIcon(EntityType.DataPlatform, size || 12, IconStyleType.ACCENT, ANTD_GRAY[9])
);
label = filterEntity ? entityRegistry.getDisplayName(EntityType.DataPlatform, filterEntity) : filterValue;
+ } else if (filterField === CONTAINER_FILTER_NAME) {
+ // Scenario where the filter entity exists and filterField is container
+ const logoUrl = (filterEntity as Container)?.platform?.properties?.logoUrl;
+ icon = logoUrl ? (
+
+ ) : (
+ entityRegistry.getIcon(EntityType.DataPlatform, size || 12, IconStyleType.ACCENT, ANTD_GRAY[9])
+ );
+ label = entityRegistry.getDisplayName(EntityType.Container, filterEntity);
} else if (filterField === BROWSE_PATH_V2_FILTER_NAME) {
icon = ;
label = getLastBrowseEntryFromFilterValue(filterValue);
@@ -196,6 +206,7 @@ export function getFilterIconAndLabel(
filterEntity,
size,
);
+
icon = newIcon;
label = newLabel;
} else {
@@ -344,6 +355,9 @@ export function getParentEntities(entity: Entity): Entity[] | null {
if (entity.type === EntityType.Domain) {
return (entity as Domain).parentDomains?.domains || [];
}
+ if (entity.type === EntityType.Container) {
+ return (entity as Container).parentContainers?.containers || [];
+ }
return null;
}
diff --git a/datahub-web-react/src/graphql/fragments.graphql b/datahub-web-react/src/graphql/fragments.graphql
index 67dbdbbb22f309..ade63f151d1a09 100644
--- a/datahub-web-react/src/graphql/fragments.graphql
+++ b/datahub-web-react/src/graphql/fragments.graphql
@@ -1010,6 +1010,7 @@ fragment entityContainer on Container {
fragment parentContainerFields on Container {
urn
+ type
properties {
name
}
diff --git a/datahub-web-react/src/graphql/search.graphql b/datahub-web-react/src/graphql/search.graphql
index 4a10d5fe250de1..3e26dd7121b72c 100644
--- a/datahub-web-react/src/graphql/search.graphql
+++ b/datahub-web-react/src/graphql/search.graphql
@@ -910,6 +910,9 @@ fragment facetFields on FacetMetadata {
properties {
name
}
+ parentContainers {
+ ...parentContainersFields
+ }
}
... on CorpUser {
username
diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md
index 087e30c2e541ad..bcc89332cc1c1b 100644
--- a/docs/how/updating-datahub.md
+++ b/docs/how/updating-datahub.md
@@ -35,6 +35,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #11701: The Fivetran `sources_to_database` field is deprecated in favor of setting directly within `sources_to_platform_instance..database`.
- #11742: For PowerBi ingestion, `use_powerbi_email` is now enabled by default when extracting ownership information.
+- #12056: The DataHub Airflow plugin no longer supports Airflow 2.1 and Airflow 2.2.
+- #12056: The DataHub Airflow plugin now defaults to the v2 plugin implementation.
### Breaking Changes
@@ -46,7 +48,14 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #11619 - schema field/column paths can no longer be duplicated within the schema
- #11570 - The `DatahubClientConfig`'s server field no longer defaults to `http://localhost:8080`. Be sure to explicitly set this.
- #11570 - If a `datahub_api` is explicitly passed to a stateful ingestion config provider, it will be used. We previously ignored it if the pipeline context also had a graph object.
-- #11518 - DataHub Garbage Collection: Various entities that are soft-deleted (after 10d) or are timeseries *entities* (dataprocess, execution requests) will be removed automatically using logic in the `datahub-gc` ingestion source.
+- #11518 - DataHub Garbage Collection: Various entities that are soft-deleted
+ (after 10d) or are timeseries *entities* (dataprocess, execution requests)
+ will be removed automatically using logic in the `datahub-gc` ingestion
+ source.
+- #12067 - Default behavior of DataJobPatchBuilder in Python sdk has been
+ changed to NOT fill out `created` and `lastModified` auditstamps by default
+ for input and output dataset edges. This should not have any user-observable
+ impact (time-based lineage viz will still continue working based on observed time), but could break assumptions previously being made by clients.
### Potential Downtime
diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md
index 829c048a8f8e24..2bd58334933fb7 100644
--- a/docs/lineage/airflow.md
+++ b/docs/lineage/airflow.md
@@ -13,14 +13,14 @@ The DataHub Airflow plugin supports:
- Task run information, including task successes and failures.
- Manual lineage annotations using `inlets` and `outlets` on Airflow operators.
-There's two actively supported implementations of the plugin, with different Airflow version support.
+There's two implementations of the plugin, with different Airflow version support.
-| Approach | Airflow Version | Notes |
-| --------- | --------------- | --------------------------------------------------------------------------- |
-| Plugin v2 | 2.3.4+ | Recommended. Requires Python 3.8+ |
-| Plugin v1 | 2.1 - 2.8 | No automatic lineage extraction; may not extract lineage if the task fails. |
+| Approach | Airflow Versions | Notes |
+| --------- | ---------------- | --------------------------------------------------------------------------------------- |
+| Plugin v2 | 2.3.4+ | Recommended. Requires Python 3.8+ |
+| Plugin v1 | 2.3 - 2.8 | Deprecated. No automatic lineage extraction; may not extract lineage if the task fails. |
-If you're using Airflow older than 2.1, it's possible to use the v1 plugin with older versions of `acryl-datahub-airflow-plugin`. See the [compatibility section](#compatibility) for more details.
+If you're using Airflow older than 2.3, it's possible to use the v1 plugin with older versions of `acryl-datahub-airflow-plugin`. See the [compatibility section](#compatibility) for more details.
@@ -29,7 +29,7 @@ If you're using Airflow older than 2.1, it's possible to use the v1 plugin with
### Installation
-The v2 plugin requires Airflow 2.3+ and Python 3.8+. If you don't meet these requirements, use the v1 plugin instead.
+The v2 plugin requires Airflow 2.3+ and Python 3.8+. If you don't meet these requirements, see the [compatibility section](#compatibility) for other options.
```shell
pip install 'acryl-datahub-airflow-plugin[plugin-v2]'
@@ -84,9 +84,10 @@ enabled = True # default
### Installation
-The v1 plugin requires Airflow 2.1 - 2.8 and Python 3.8+. If you're on older versions, it's still possible to use an older version of the plugin. See the [compatibility section](#compatibility) for more details.
+The v1 plugin requires Airflow 2.3 - 2.8 and Python 3.8+. If you're on older versions, it's still possible to use an older version of the plugin. See the [compatibility section](#compatibility) for more details.
-If you're using Airflow 2.3+, we recommend using the v2 plugin instead. If you need to use the v1 plugin with Airflow 2.3+, you must also set the environment variable `DATAHUB_AIRFLOW_PLUGIN_USE_V1_PLUGIN=true`.
+Note that the v1 plugin is less featureful than the v2 plugin, and is overall not actively maintained.
+Since datahub v0.15.0, the v2 plugin has been the default. If you need to use the v1 plugin with `acryl-datahub-airflow-plugin` v0.15.0+, you must also set the environment variable `DATAHUB_AIRFLOW_PLUGIN_USE_V1_PLUGIN=true`.
```shell
pip install 'acryl-datahub-airflow-plugin[plugin-v1]'
@@ -340,11 +341,12 @@ The solution is to upgrade `acryl-datahub-airflow-plugin>=0.12.0.4` or upgrade `
## Compatibility
-We no longer officially support Airflow <2.1. However, you can use older versions of `acryl-datahub-airflow-plugin` with older versions of Airflow.
-Both of these options support Python 3.7+.
+We no longer officially support Airflow <2.3. However, you can use older versions of `acryl-datahub-airflow-plugin` with older versions of Airflow.
+The first two options support Python 3.7+, and the last option supports Python 3.8+.
- Airflow 1.10.x, use DataHub plugin v1 with acryl-datahub-airflow-plugin <= 0.9.1.0.
- Airflow 2.0.x, use DataHub plugin v1 with acryl-datahub-airflow-plugin <= 0.11.0.1.
+- Airflow 2.2.x, use DataHub plugin v2 with acryl-datahub-airflow-plugin <= 0.14.1.5.
DataHub also previously supported an Airflow [lineage backend](https://airflow.apache.org/docs/apache-airflow/2.2.0/lineage.html#lineage-backend) implementation. While the implementation is still in our codebase, it is deprecated and will be removed in a future release.
Note that the lineage backend did not support automatic lineage extraction, did not capture task failures, and did not work in AWS MWAA.
diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java
index 30f5dce379a077..6ce6a9a5730385 100644
--- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java
+++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java
@@ -28,10 +28,12 @@
public interface AspectsBatch {
Collection extends BatchItem> getItems();
+ Collection extends BatchItem> getInitialItems();
+
RetrieverContext getRetrieverContext();
/**
- * Returns MCP items. Could be patch, upsert, etc.
+ * Returns MCP items. Could be one of patch, upsert, etc.
*
* @return batch items
*/
@@ -160,13 +162,24 @@ static Stream applyMCLSideEffects(
}
default boolean containsDuplicateAspects() {
- return getItems().stream()
- .map(i -> String.format("%s_%s", i.getClass().getName(), i.hashCode()))
+ return getInitialItems().stream()
+ .map(i -> String.format("%s_%s", i.getClass().getSimpleName(), i.hashCode()))
.distinct()
.count()
!= getItems().size();
}
+ default Map> duplicateAspects() {
+ return getInitialItems().stream()
+ .collect(
+ Collectors.groupingBy(
+ i -> String.format("%s_%s", i.getClass().getSimpleName(), i.hashCode())))
+ .entrySet()
+ .stream()
+ .filter(entry -> entry.getValue() != null && entry.getValue().size() > 1)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
default Map> getUrnAspectsMap() {
return getItems().stream()
.map(aspect -> Pair.of(aspect.getUrn().toString(), aspect.getAspectName()))
diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/BatchItem.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/BatchItem.java
index a6dfbc277e12ec..7f0a849a0eda1d 100644
--- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/BatchItem.java
+++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/BatchItem.java
@@ -23,4 +23,11 @@ public interface BatchItem extends ReadItem {
*/
@Nonnull
ChangeType getChangeType();
+
+ /**
+ * Determines if this item is a duplicate of another item in terms of the operation it represents
+ * to the database.Each implementation can define what constitutes a duplicate based on its
+ * specific fields which are persisted.
+ */
+ boolean isDatabaseDuplicateOf(BatchItem other);
}
diff --git a/entity-registry/src/main/java/com/linkedin/metadata/models/registry/PluginEntityRegistryLoader.java b/entity-registry/src/main/java/com/linkedin/metadata/models/registry/PluginEntityRegistryLoader.java
index 4f2e5a106ae792..531537852109b0 100644
--- a/entity-registry/src/main/java/com/linkedin/metadata/models/registry/PluginEntityRegistryLoader.java
+++ b/entity-registry/src/main/java/com/linkedin/metadata/models/registry/PluginEntityRegistryLoader.java
@@ -6,7 +6,6 @@
import com.linkedin.metadata.models.registry.config.LoadStatus;
import com.linkedin.util.Pair;
import java.io.File;
-import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.file.Files;
@@ -204,8 +203,8 @@ private void loadOneRegistry(
loadResultBuilder.plugins(entityRegistry.getPluginFactory().getPluginLoadResult());
log.info("Loaded registry {} successfully", entityRegistry);
- } catch (RuntimeException | EntityRegistryException | IOException e) {
- log.debug("{}: Failed to load registry {} with {}", this, registryName, e.getMessage());
+ } catch (Exception | EntityRegistryException e) {
+ log.error("{}: Failed to load registry {} with {}", this, registryName, e.getMessage(), e);
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
diff --git a/entity-registry/src/testFixtures/java/com/linkedin/test/metadata/aspect/batch/TestMCL.java b/entity-registry/src/testFixtures/java/com/linkedin/test/metadata/aspect/batch/TestMCL.java
index 7dd889c48b8747..6643a9de58562b 100644
--- a/entity-registry/src/testFixtures/java/com/linkedin/test/metadata/aspect/batch/TestMCL.java
+++ b/entity-registry/src/testFixtures/java/com/linkedin/test/metadata/aspect/batch/TestMCL.java
@@ -4,10 +4,12 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
+import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.MCLItem;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.mxe.MetadataChangeLog;
+import java.util.Objects;
import javax.annotation.Nonnull;
import lombok.Builder;
import lombok.Getter;
@@ -29,4 +31,23 @@ public class TestMCL implements MCLItem {
public String getAspectName() {
return getAspectSpec().getName();
}
+
+ @Override
+ public boolean isDatabaseDuplicateOf(BatchItem other) {
+ return equals(other);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TestMCL testMCL = (TestMCL) o;
+ return Objects.equals(metadataChangeLog, testMCL.metadataChangeLog);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(metadataChangeLog);
+ }
}
diff --git a/entity-registry/src/testFixtures/java/com/linkedin/test/metadata/aspect/batch/TestMCP.java b/entity-registry/src/testFixtures/java/com/linkedin/test/metadata/aspect/batch/TestMCP.java
index e562390a959a74..5b714bdbf0b478 100644
--- a/entity-registry/src/testFixtures/java/com/linkedin/test/metadata/aspect/batch/TestMCP.java
+++ b/entity-registry/src/testFixtures/java/com/linkedin/test/metadata/aspect/batch/TestMCP.java
@@ -6,6 +6,7 @@
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
+import com.linkedin.data.template.DataTemplateUtil;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.ReadItem;
@@ -21,6 +22,7 @@
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -140,4 +142,40 @@ public Map getHeaders() {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
.orElse(headers);
}
+
+ @Override
+ public boolean isDatabaseDuplicateOf(BatchItem other) {
+ return equals(other);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TestMCP testMCP = (TestMCP) o;
+ return urn.equals(testMCP.urn)
+ && DataTemplateUtil.areEqual(recordTemplate, testMCP.recordTemplate)
+ && Objects.equals(systemAspect, testMCP.systemAspect)
+ && Objects.equals(previousSystemAspect, testMCP.previousSystemAspect)
+ && Objects.equals(auditStamp, testMCP.auditStamp)
+ && Objects.equals(changeType, testMCP.changeType)
+ && Objects.equals(metadataChangeProposal, testMCP.metadataChangeProposal);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = urn.hashCode();
+ result = 31 * result + Objects.hashCode(recordTemplate);
+ result = 31 * result + Objects.hashCode(systemAspect);
+ result = 31 * result + Objects.hashCode(previousSystemAspect);
+ result = 31 * result + Objects.hashCode(auditStamp);
+ result = 31 * result + Objects.hashCode(changeType);
+ result = 31 * result + Objects.hashCode(metadataChangeProposal);
+ return result;
+ }
}
diff --git a/metadata-events/mxe-schemas/build.gradle b/metadata-events/mxe-schemas/build.gradle
index ab0ea8b649e9d4..6dfe69a420242f 100644
--- a/metadata-events/mxe-schemas/build.gradle
+++ b/metadata-events/mxe-schemas/build.gradle
@@ -25,7 +25,7 @@ task copyOriginalAvsc(type: Copy, dependsOn: generateAvroSchema) {
}
task renameNamespace(type: Exec, dependsOn: copyOriginalAvsc) {
- commandLine 'sh', './rename-namespace.sh'
+ commandLine 'bash', './rename-namespace.sh'
}
build.dependsOn renameNamespace
@@ -34,4 +34,4 @@ clean {
project.delete('src/main/pegasus')
project.delete('src/mainGeneratedAvroSchema/avro')
project.delete('src/renamed/avro')
-}
\ No newline at end of file
+}
diff --git a/metadata-ingestion-modules/airflow-plugin/build.gradle b/metadata-ingestion-modules/airflow-plugin/build.gradle
index f30858ba6a14ef..68a35c0dfc417b 100644
--- a/metadata-ingestion-modules/airflow-plugin/build.gradle
+++ b/metadata-ingestion-modules/airflow-plugin/build.gradle
@@ -13,7 +13,7 @@ if (!project.hasProperty("extra_pip_requirements")) {
ext.extra_pip_requirements = ""
}
if (!project.hasProperty("extra_pip_extras")) {
- ext.extra_pip_extras = "plugin-v2"
+ ext.extra_pip_extras = ""
}
// If extra_pip_extras is non-empty, we need to add a comma to the beginning of the string.
if (extra_pip_extras != "") {
diff --git a/metadata-ingestion-modules/airflow-plugin/setup.py b/metadata-ingestion-modules/airflow-plugin/setup.py
index 02a0bbb6022e04..3209233184d55a 100644
--- a/metadata-ingestion-modules/airflow-plugin/setup.py
+++ b/metadata-ingestion-modules/airflow-plugin/setup.py
@@ -24,8 +24,8 @@ def get_long_description():
base_requirements = {
f"acryl-datahub[datahub-rest]{_self_pin}",
- # Actual dependencies.
- "apache-airflow >= 2.0.2",
+ # We require Airflow 2.3.x, since we need the new DAG listener API.
+ "apache-airflow>=2.3.0",
}
plugins: Dict[str, Set[str]] = {
@@ -44,12 +44,13 @@ def get_long_description():
# We remain restrictive on the versions allowed here to prevent
# us from being broken by backwards-incompatible changes in the
# underlying package.
- "openlineage-airflow>=1.2.0,<=1.22.0",
+ "openlineage-airflow>=1.2.0,<=1.25.0",
},
}
-# Include datahub-rest in the base requirements.
+# Require some plugins by default.
base_requirements.update(plugins["datahub-rest"])
+base_requirements.update(plugins["plugin-v2"])
mypy_stubs = {
@@ -109,6 +110,11 @@ def get_long_description():
"apache-airflow-providers-sqlite",
}
per_version_test_requirements = {
+ "test-airflow23": {
+ "pendulum<3.0",
+ "Flask-Session<0.6.0",
+ "connexion<3.0",
+ },
"test-airflow24": {
"pendulum<3.0",
"Flask-Session<0.6.0",
diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py
index c1e2dd4cc422d0..d86a46e042e8f8 100644
--- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py
+++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py
@@ -46,7 +46,7 @@ def get_task_inlets(operator: "Operator") -> List:
return operator._inlets # type: ignore[attr-defined, union-attr]
if hasattr(operator, "get_inlet_defs"):
return operator.get_inlet_defs() # type: ignore[attr-defined]
- return operator.inlets
+ return operator.inlets or []
def get_task_outlets(operator: "Operator") -> List:
@@ -56,7 +56,7 @@ def get_task_outlets(operator: "Operator") -> List:
return operator._outlets # type: ignore[attr-defined, union-attr]
if hasattr(operator, "get_outlet_defs"):
return operator.get_outlet_defs()
- return operator.outlets
+ return operator.outlets or []
__all__ = [
diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py
index e00cf51ea456cc..aa7b3108f64f1e 100644
--- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py
+++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py
@@ -74,7 +74,7 @@ def hookimpl(f: _F) -> _F: # type: ignore[misc] # noqa: F811
"1",
)
_RUN_IN_THREAD_TIMEOUT = float(
- os.getenv("DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD_TIMEOUT", 15)
+ os.getenv("DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD_TIMEOUT", 10)
)
_DATAHUB_CLEANUP_DAG = "Datahub_Cleanup"
@@ -102,6 +102,7 @@ def get_airflow_plugin_listener() -> Optional["DataHubListener"]:
"capture_tags": plugin_config.capture_tags_info,
"capture_ownership": plugin_config.capture_ownership_info,
"enable_extractors": plugin_config.enable_extractors,
+ "render_templates": plugin_config.render_templates,
"disable_openlineage_plugin": plugin_config.disable_openlineage_plugin,
},
)
diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_basic_iolets.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_basic_iolets.json
index fd10f858d00fb6..4c21b7ed4000dc 100644
--- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_basic_iolets.json
+++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_basic_iolets.json
@@ -14,7 +14,7 @@
"fileloc": "",
"is_paused_upon_creation": "None",
"start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))",
- "tags": "None",
+ "tags": "[]",
"timezone": "Timezone('UTC')"
},
"externalUrl": "http://airflow.example.com/tree?dag_id=basic_iolets",
@@ -83,7 +83,7 @@
"execution_timeout": "None",
"sla": "None",
"task_id": "'run_data_task'",
- "trigger_rule": "'all_success'",
+ "trigger_rule": "",
"wait_for_downstream": "False",
"downstream_task_ids": "[]",
"inlets": "[]",
@@ -246,6 +246,46 @@
}
}
},
+{
+ "entityType": "dataJob",
+ "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
+ "changeType": "UPSERT",
+ "aspectName": "dataJobInfo",
+ "aspect": {
+ "json": {
+ "customProperties": {
+ "depends_on_past": "False",
+ "email": "None",
+ "label": "'run_data_task'",
+ "execution_timeout": "None",
+ "sla": "None",
+ "task_id": "'run_data_task'",
+ "trigger_rule": "",
+ "wait_for_downstream": "False",
+ "downstream_task_ids": "[]",
+ "inlets": "[]",
+ "outlets": "[]"
+ },
+ "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=basic_iolets&_flt_3_task_id=run_data_task",
+ "name": "run_data_task",
+ "type": {
+ "string": "COMMAND"
+ },
+ "env": "PROD"
+ }
+ }
+},
+{
+ "entityType": "dataJob",
+ "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
+ "changeType": "UPSERT",
+ "aspectName": "status",
+ "aspect": {
+ "json": {
+ "removed": false
+ }
+ }
+},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
@@ -402,16 +442,16 @@
"state": "success",
"operator": "BashOperator",
"priority_weight": "1",
- "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets",
+ "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets&map_index=-1",
"orchestrator": "airflow",
"dag_id": "basic_iolets",
"task_id": "run_data_task"
},
- "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets",
+ "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets&map_index=-1",
"name": "basic_iolets_run_data_task_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
- "time": 1717180290951,
+ "time": 1733529136396,
"actor": "urn:li:corpuser:datahub"
}
}
@@ -544,7 +584,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
- "timestampMillis": 1717180290951,
+ "timestampMillis": 1733529136396,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
@@ -561,7 +601,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
- "timestampMillis": 1717180291140,
+ "timestampMillis": 1733529137385,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_simple_dag.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_simple_dag.json
index 5c5be6848fd839..b6ab1ff9120f25 100644
--- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_simple_dag.json
+++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_simple_dag.json
@@ -14,7 +14,7 @@
"fileloc": "",
"is_paused_upon_creation": "None",
"start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))",
- "tags": "None",
+ "tags": "[]",
"timezone": "Timezone('UTC')"
},
"externalUrl": "http://airflow.example.com/tree?dag_id=simple_dag",
@@ -84,7 +84,7 @@
"execution_timeout": "None",
"sla": "None",
"task_id": "'task_1'",
- "trigger_rule": "'all_success'",
+ "trigger_rule": "",
"wait_for_downstream": "False",
"downstream_task_ids": "['run_another_data_task']",
"inlets": "[]",
@@ -205,6 +205,46 @@
}
}
},
+{
+ "entityType": "dataJob",
+ "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",
+ "changeType": "UPSERT",
+ "aspectName": "dataJobInfo",
+ "aspect": {
+ "json": {
+ "customProperties": {
+ "depends_on_past": "False",
+ "email": "None",
+ "label": "'task_1'",
+ "execution_timeout": "None",
+ "sla": "None",
+ "task_id": "'task_1'",
+ "trigger_rule": "",
+ "wait_for_downstream": "False",
+ "downstream_task_ids": "['run_another_data_task']",
+ "inlets": "[]",
+ "outlets": "[]"
+ },
+ "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=task_1",
+ "name": "task_1",
+ "type": {
+ "string": "COMMAND"
+ },
+ "env": "PROD"
+ }
+ }
+},
+{
+ "entityType": "dataJob",
+ "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",
+ "changeType": "UPSERT",
+ "aspectName": "status",
+ "aspect": {
+ "json": {
+ "removed": false
+ }
+ }
+},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",
@@ -319,16 +359,16 @@
"state": "success",
"operator": "BashOperator",
"priority_weight": "2",
- "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag",
+ "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1",
"orchestrator": "airflow",
"dag_id": "simple_dag",
"task_id": "task_1"
},
- "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag",
+ "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1",
"name": "simple_dag_task_1_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
- "time": 1717180227827,
+ "time": 1733528983395,
"actor": "urn:li:corpuser:datahub"
}
}
@@ -419,7 +459,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
- "timestampMillis": 1717180227827,
+ "timestampMillis": 1733528983395,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
@@ -436,7 +476,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
- "timestampMillis": 1717180228022,
+ "timestampMillis": 1733528984355,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
@@ -449,6 +489,42 @@
}
}
},
+{
+ "entityType": "dataFlow",
+ "entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)",
+ "changeType": "UPSERT",
+ "aspectName": "dataFlowInfo",
+ "aspect": {
+ "json": {
+ "customProperties": {
+ "_access_control": "None",
+ "catchup": "False",
+ "description": "'A simple DAG that runs a few fake data tasks.'",
+ "doc_md": "None",
+ "fileloc": "",
+ "is_paused_upon_creation": "None",
+ "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))",
+ "tags": "[]",
+ "timezone": "Timezone('UTC')"
+ },
+ "externalUrl": "http://airflow.example.com/tree?dag_id=simple_dag",
+ "name": "simple_dag",
+ "description": "A simple DAG that runs a few fake data tasks.",
+ "env": "PROD"
+ }
+ }
+},
+{
+ "entityType": "dataFlow",
+ "entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)",
+ "changeType": "UPSERT",
+ "aspectName": "status",
+ "aspect": {
+ "json": {
+ "removed": false
+ }
+ }
+},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)",
@@ -498,7 +574,7 @@
"execution_timeout": "None",
"sla": "None",
"task_id": "'run_another_data_task'",
- "trigger_rule": "'all_success'",
+ "trigger_rule": "",
"wait_for_downstream": "False",
"downstream_task_ids": "[]",
"inlets": "[]",
@@ -575,6 +651,46 @@
}
}
},
+{
+ "entityType": "dataJob",
+ "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)",
+ "changeType": "UPSERT",
+ "aspectName": "dataJobInfo",
+ "aspect": {
+ "json": {
+ "customProperties": {
+ "depends_on_past": "False",
+ "email": "None",
+ "label": "'run_another_data_task'",
+ "execution_timeout": "None",
+ "sla": "None",
+ "task_id": "'run_another_data_task'",
+ "trigger_rule": "",
+ "wait_for_downstream": "False",
+ "downstream_task_ids": "[]",
+ "inlets": "[]",
+ "outlets": "[]"
+ },
+ "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=run_another_data_task",
+ "name": "run_another_data_task",
+ "type": {
+ "string": "COMMAND"
+ },
+ "env": "PROD"
+ }
+ }
+},
+{
+ "entityType": "dataJob",
+ "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)",
+ "changeType": "UPSERT",
+ "aspectName": "status",
+ "aspect": {
+ "json": {
+ "removed": false
+ }
+ }
+},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)",
@@ -645,16 +761,16 @@
"state": "success",
"operator": "BashOperator",
"priority_weight": "1",
- "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag",
+ "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1",
"orchestrator": "airflow",
"dag_id": "simple_dag",
"task_id": "run_another_data_task"
},
- "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag",
+ "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1",
"name": "simple_dag_run_another_data_task_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
- "time": 1717180231676,
+ "time": 1733528992448,
"actor": "urn:li:corpuser:datahub"
}
}
@@ -679,7 +795,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
- "timestampMillis": 1717180231676,
+ "timestampMillis": 1733528992448,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
@@ -696,7 +812,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
- "timestampMillis": 1717180231824,
+ "timestampMillis": 1733528993380,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py
index 75bb43af1a43dd..3b2c9140e4632f 100644
--- a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py
+++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py
@@ -12,6 +12,7 @@
import time
from typing import Any, Iterator, Sequence
+import packaging.version
import pytest
import requests
import tenacity
@@ -20,6 +21,7 @@
from datahub.testing.compare_metadata_json import assert_metadata_files_equal
from datahub_airflow_plugin._airflow_shims import (
+ AIRFLOW_VERSION,
HAS_AIRFLOW_DAG_LISTENER_API,
HAS_AIRFLOW_LISTENER_API,
HAS_AIRFLOW_STANDALONE_CMD,
@@ -242,6 +244,7 @@ def _run_airflow(
# Note that we could also disable the RUN_IN_THREAD entirely,
# but I want to minimize the difference between CI and prod.
"DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD_TIMEOUT": "30",
+ "DATAHUB_AIRFLOW_PLUGIN_USE_V1_PLUGIN": "true" if is_v1 else "false",
# Convenience settings.
"AIRFLOW__DATAHUB__LOG_LEVEL": "DEBUG",
"AIRFLOW__DATAHUB__DEBUG_EMITTER": "True",
@@ -361,7 +364,6 @@ class DagTestCase:
@pytest.mark.parametrize(
["golden_filename", "test_case", "is_v1"],
[
- # On Airflow <= 2.2, test plugin v1.
*[
pytest.param(
f"v1_{test_case.dag_id}",
@@ -369,8 +371,8 @@ class DagTestCase:
True,
id=f"v1_{test_case.dag_id}",
marks=pytest.mark.skipif(
- HAS_AIRFLOW_LISTENER_API,
- reason="Not testing plugin v1 on newer Airflow versions",
+ AIRFLOW_VERSION >= packaging.version.parse("2.4.0"),
+ reason="We only test the v1 plugin on Airflow 2.3",
),
)
for test_case in test_cases
@@ -391,10 +393,18 @@ class DagTestCase:
if HAS_AIRFLOW_DAG_LISTENER_API
else f"v2_{test_case.dag_id}_no_dag_listener"
),
- marks=pytest.mark.skipif(
- not HAS_AIRFLOW_LISTENER_API,
- reason="Cannot test plugin v2 without the Airflow plugin listener API",
- ),
+ marks=[
+ pytest.mark.skipif(
+ not HAS_AIRFLOW_LISTENER_API,
+ reason="Cannot test plugin v2 without the Airflow plugin listener API",
+ ),
+ pytest.mark.skipif(
+ AIRFLOW_VERSION < packaging.version.parse("2.4.0"),
+ reason="We skip testing the v2 plugin on Airflow 2.3 because it causes flakiness in the custom properties. "
+ "Ideally we'd just fix these, but given that Airflow 2.3 is EOL and likely going to be deprecated "
+ "soon anyways, it's not worth the effort.",
+ ),
+ ],
)
for test_case in test_cases
],
diff --git a/metadata-ingestion-modules/airflow-plugin/tox.ini b/metadata-ingestion-modules/airflow-plugin/tox.ini
index 28c0b9532bcb8e..b310ec84248f17 100644
--- a/metadata-ingestion-modules/airflow-plugin/tox.ini
+++ b/metadata-ingestion-modules/airflow-plugin/tox.ini
@@ -4,17 +4,24 @@
# and then run "tox" from this directory.
[tox]
-envlist = py38-airflow21, py38-airflow22, py310-airflow24, py310-airflow26, py310-airflow27, py310-airflow28, py311-airflow29, py311-airflow210
+envlist = py38-airflow23, py310-airflow24, py310-airflow26, py310-airflow27, py310-airflow28, py311-airflow29, py311-airflow210
[testenv]
use_develop = true
-extras = dev,integration-tests,plugin-v1
+extras =
+ dev
+ integration-tests
+ plugin-v1
+ plugin-v2
+ # For Airflow 2.3 and 2.4, add a few extra requirements.
+ airflow23: test-airflow23
+ airflow24: test-airflow24
+
deps =
# This should be kept in sync with the Github Actions matrix.
-e ../../metadata-ingestion/
# Airflow version
- airflow21: apache-airflow~=2.1.0
- airflow22: apache-airflow~=2.2.0
+ airflow23: apache-airflow~=2.3.0
airflow24: apache-airflow~=2.4.0
airflow26: apache-airflow~=2.6.0
airflow27: apache-airflow~=2.7.0
@@ -23,7 +30,8 @@ deps =
airflow210: apache-airflow~=2.10.0
# Respect the Airflow constraints files.
- # We can't make ourselves work with the constraints of Airflow < 2.3.
+ # We can't make ourselves work with the constraints of Airflow <= 2.3.
+ ; py38-airflow23: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.3.4/constraints-3.8.txt
# The Airflow 2.4 constraints file requires a version of the sqlite provider whose
# hook type is missing the `conn_name_attr` property.
; py310-airflow24: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.4.3/constraints-3.10.txt
@@ -31,7 +39,7 @@ deps =
py310-airflow27: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt
py310-airflow28: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.10.txt
py311-airflow29: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.9.3/constraints-3.11.txt
- py311-airflow210: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.11.txt
+ py311-airflow210: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.10.3/constraints-3.11.txt
# Before pinning to the constraint files, we previously left the dependencies
# more open. There were a number of packages for which this caused issues.
@@ -54,11 +62,3 @@ deps =
; airflow24,airflow26,airflow27,airflow28: Flask-Session<0.6.0
commands =
pytest --cov-append {posargs}
-
-# For Airflow 2.4+, add the plugin-v2 extra.
-[testenv:py310-airflow24]
-extras = dev,integration-tests,plugin-v2,test-airflow24
-
-[testenv:py3{10,11}-airflow{26,27,28,29,210}]
-extras = dev,integration-tests,plugin-v2
-
diff --git a/metadata-ingestion/src/datahub/specific/datajob.py b/metadata-ingestion/src/datahub/specific/datajob.py
index fb7b0ae7816f17..6ff4741b09c26a 100644
--- a/metadata-ingestion/src/datahub/specific/datajob.py
+++ b/metadata-ingestion/src/datahub/specific/datajob.py
@@ -102,7 +102,7 @@ def add_input_datajob(self, input: Union[Edge, Urn, str]) -> "DataJobPatchBuilde
Notes:
If `input` is an Edge object, it is used directly. If `input` is a Urn object or string,
- it is converted to an Edge object and added with default audit stamps.
+ it is converted to an Edge object and added without any audit stamps.
"""
if isinstance(input, Edge):
input_urn: str = input.destinationUrn
@@ -114,8 +114,6 @@ def add_input_datajob(self, input: Union[Edge, Urn, str]) -> "DataJobPatchBuilde
input_edge = Edge(
destinationUrn=input_urn,
- created=self._mint_auditstamp(),
- lastModified=self._mint_auditstamp(),
)
self._ensure_urn_type("dataJob", [input_edge], "add_input_datajob")
@@ -185,7 +183,7 @@ def add_input_dataset(self, input: Union[Edge, Urn, str]) -> "DataJobPatchBuilde
Notes:
If `input` is an Edge object, it is used directly. If `input` is a Urn object or string,
- it is converted to an Edge object and added with default audit stamps.
+ it is converted to an Edge object and added without any audit stamps.
"""
if isinstance(input, Edge):
input_urn: str = input.destinationUrn
@@ -197,8 +195,6 @@ def add_input_dataset(self, input: Union[Edge, Urn, str]) -> "DataJobPatchBuilde
input_edge = Edge(
destinationUrn=input_urn,
- created=self._mint_auditstamp(),
- lastModified=self._mint_auditstamp(),
)
self._ensure_urn_type("dataset", [input_edge], "add_input_dataset")
@@ -270,7 +266,7 @@ def add_output_dataset(
Notes:
If `output` is an Edge object, it is used directly. If `output` is a Urn object or string,
- it is converted to an Edge object and added with default audit stamps.
+ it is converted to an Edge object and added without any audit stamps.
"""
if isinstance(output, Edge):
output_urn: str = output.destinationUrn
@@ -282,15 +278,13 @@ def add_output_dataset(
output_edge = Edge(
destinationUrn=output_urn,
- created=self._mint_auditstamp(),
- lastModified=self._mint_auditstamp(),
)
self._ensure_urn_type("dataset", [output_edge], "add_output_dataset")
self._add_patch(
DataJobInputOutput.ASPECT_NAME,
"add",
- path=f"/outputDatasetEdges/{self.quote(str(output))}",
+ path=f"/outputDatasetEdges/{self.quote(output_urn)}",
value=output_edge,
)
return self
diff --git a/metadata-ingestion/tests/integration/nifi/nifi_mces_golden_standalone.json b/metadata-ingestion/tests/integration/nifi/nifi_mces_golden_standalone.json
index f820efc7399492..e026664a78e0be 100644
--- a/metadata-ingestion/tests/integration/nifi/nifi_mces_golden_standalone.json
+++ b/metadata-ingestion/tests/integration/nifi/nifi_mces_golden_standalone.json
@@ -60,15 +60,7 @@
"op": "add",
"path": "/inputDatajobEdges/urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),91d59f03-1c2b-3f3f-48bc-f89296a328bd)",
"value": {
- "destinationUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),91d59f03-1c2b-3f3f-48bc-f89296a328bd)",
- "created": {
- "time": 1638532800000,
- "actor": "urn:li:corpuser:datahub"
- },
- "lastModified": {
- "time": 1638532800000,
- "actor": "urn:li:corpuser:datahub"
- }
+ "destinationUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),91d59f03-1c2b-3f3f-48bc-f89296a328bd)"
}
}
]
@@ -178,30 +170,14 @@
"op": "add",
"path": "/inputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)",
"value": {
- "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)",
- "created": {
- "time": 1638532800000,
- "actor": "urn:li:corpuser:datahub"
- },
- "lastModified": {
- "time": 1638532800000,
- "actor": "urn:li:corpuser:datahub"
- }
+ "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)"
}
},
{
"op": "add",
"path": "/inputDatajobEdges/urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),cb7693ed-f93b-3340-3776-fe80e6283ddc)",
"value": {
- "destinationUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),cb7693ed-f93b-3340-3776-fe80e6283ddc)",
- "created": {
- "time": 1638532800000,
- "actor": "urn:li:corpuser:datahub"
- },
- "lastModified": {
- "time": 1638532800000,
- "actor": "urn:li:corpuser:datahub"
- }
+ "destinationUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),cb7693ed-f93b-3340-3776-fe80e6283ddc)"
}
}
]
@@ -287,15 +263,7 @@
"op": "add",
"path": "/inputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)",
"value": {
- "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)",
- "created": {
- "time": 1638532800000,
- "actor": "urn:li:corpuser:datahub"
- },
- "lastModified": {
- "time": 1638532800000,
- "actor": "urn:li:corpuser:datahub"
- }
+ "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)"
}
}
]
diff --git a/metadata-ingestion/tests/unit/patch/test_patch_builder.py b/metadata-ingestion/tests/unit/patch/test_patch_builder.py
index 267da6cdd5d205..f4bf501e0714d0 100644
--- a/metadata-ingestion/tests/unit/patch/test_patch_builder.py
+++ b/metadata-ingestion/tests/unit/patch/test_patch_builder.py
@@ -1,5 +1,6 @@
import json
import pathlib
+from typing import Any, Dict, Union
import pytest
from freezegun.api import freeze_time
@@ -15,7 +16,9 @@
)
from datahub.ingestion.sink.file import write_metadata_file
from datahub.metadata.schema_classes import (
+ AuditStampClass,
DatasetLineageTypeClass,
+ EdgeClass,
FineGrainedLineageClass,
FineGrainedLineageDownstreamTypeClass,
FineGrainedLineageUpstreamTypeClass,
@@ -182,8 +185,66 @@ def test_basic_dashboard_patch_builder():
]
+@pytest.mark.parametrize(
+ "created_on,last_modified,expected_actor",
+ [
+ (1586847600000, 1586847600000, "urn:li:corpuser:datahub"),
+ (None, None, "urn:li:corpuser:datahub"),
+ (1586847600000, None, "urn:li:corpuser:datahub"),
+ (None, 1586847600000, "urn:li:corpuser:datahub"),
+ ],
+ ids=["both_timestamps", "no_timestamps", "only_created", "only_modified"],
+)
@freeze_time("2020-04-14 07:00:00")
-def test_datajob_patch_builder():
+def test_datajob_patch_builder(created_on, last_modified, expected_actor):
+ def make_edge_or_urn(urn: str) -> Union[EdgeClass, str]:
+ if created_on or last_modified:
+ return EdgeClass(
+ destinationUrn=str(urn),
+ created=(
+ AuditStampClass(
+ time=created_on,
+ actor=expected_actor,
+ )
+ if created_on
+ else None
+ ),
+ lastModified=(
+ AuditStampClass(
+ time=last_modified,
+ actor=expected_actor,
+ )
+ if last_modified
+ else None
+ ),
+ )
+ return urn
+
+ def get_edge_expectation(urn: str) -> Dict[str, Any]:
+ if created_on or last_modified:
+ expected = {
+ "destinationUrn": str(urn),
+ "created": (
+ AuditStampClass(
+ time=created_on,
+ actor=expected_actor,
+ ).to_obj()
+ if created_on
+ else None
+ ),
+ "lastModified": (
+ AuditStampClass(
+ time=last_modified,
+ actor=expected_actor,
+ ).to_obj()
+ if last_modified
+ else None
+ ),
+ }
+ # filter out None values
+ return {k: v for k, v in expected.items() if v is not None}
+ return {"destinationUrn": str(urn)}
+
flow_urn = make_data_flow_urn(
orchestrator="nifi", flow_id="252C34e5af19-0192-1000-b248-b1abee565b5d"
)
@@ -193,13 +254,19 @@ def test_datajob_patch_builder():
patcher = DataJobPatchBuilder(job_urn)
patcher.add_output_dataset(
- "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)"
+ make_edge_or_urn(
+ "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)"
+ )
)
patcher.add_output_dataset(
- "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)"
+ make_edge_or_urn(
+ "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)"
+ )
)
patcher.add_output_dataset(
- "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)"
+ make_edge_or_urn(
+ "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)"
+ )
)
assert patcher.build() == [
@@ -214,47 +281,23 @@ def test_datajob_patch_builder():
{
"op": "add",
"path": "/outputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket~1folder1,DEV)",
- "value": {
- "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)",
- "created": {
- "time": 1586847600000,
- "actor": "urn:li:corpuser:datahub",
- },
- "lastModified": {
- "time": 1586847600000,
- "actor": "urn:li:corpuser:datahub",
- },
- },
+ "value": get_edge_expectation(
+ "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)"
+ ),
},
{
"op": "add",
"path": "/outputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket~1folder3,DEV)",
- "value": {
- "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)",
- "created": {
- "time": 1586847600000,
- "actor": "urn:li:corpuser:datahub",
- },
- "lastModified": {
- "time": 1586847600000,
- "actor": "urn:li:corpuser:datahub",
- },
- },
+ "value": get_edge_expectation(
+ "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)"
+ ),
},
{
"op": "add",
"path": "/outputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket~1folder2,DEV)",
- "value": {
- "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)",
- "created": {
- "time": 1586847600000,
- "actor": "urn:li:corpuser:datahub",
- },
- "lastModified": {
- "time": 1586847600000,
- "actor": "urn:li:corpuser:datahub",
- },
- },
+ "value": get_edge_expectation(
+ "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)"
+ ),
},
]
).encode("utf-8"),
diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/EntityAspect.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/EntityAspect.java
index 976db4133c0043..2b67d5e92f833c 100644
--- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/EntityAspect.java
+++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/EntityAspect.java
@@ -52,6 +52,26 @@ public class EntityAspect {
private String createdFor;
+ @Override
+ public String toString() {
+ return "EntityAspect{"
+ + "urn='"
+ + urn
+ + '\''
+ + ", aspect='"
+ + aspect
+ + '\''
+ + ", version="
+ + version
+ + ", metadata='"
+ + metadata
+ + '\''
+ + ", systemMetadata='"
+ + systemMetadata
+ + '\''
+ + '}';
+ }
+
/**
* Provide a typed EntityAspect without breaking the existing public contract with generic types.
*/
@@ -144,6 +164,11 @@ public EnvelopedAspect toEnvelopedAspects() {
return envelopedAspect;
}
+ @Override
+ public String toString() {
+ return entityAspect.toString();
+ }
+
public static class EntitySystemAspectBuilder {
private EntityAspect.EntitySystemAspect build() {
diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java
index c0d65640df2378..1af9fc1565a456 100644
--- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java
+++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java
@@ -1,6 +1,7 @@
package com.linkedin.metadata.entity.ebean.batch;
import com.linkedin.common.AuditStamp;
+import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
@@ -15,7 +16,9 @@
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.util.Pair;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -29,12 +32,23 @@
import lombok.extern.slf4j.Slf4j;
@Slf4j
-@Getter
@Builder(toBuilder = true)
public class AspectsBatchImpl implements AspectsBatch {
@Nonnull private final Collection extends BatchItem> items;
- @Nonnull private final RetrieverContext retrieverContext;
+ @Nonnull private final Collection extends BatchItem> nonRepeatedItems;
+ @Getter @Nonnull private final RetrieverContext retrieverContext;
+
+ @Override
+ @Nonnull
+ public Collection extends BatchItem> getItems() {
+ return nonRepeatedItems;
+ }
+
+ @Override
+ public Collection extends BatchItem> getInitialItems() {
+ return items;
+ }
/**
* Convert patches to upserts, apply hooks at the aspect and batch level.
@@ -207,14 +221,32 @@ public AspectsBatchImplBuilder mcps(
return this;
}
+ private static List filterRepeats(Collection items) {
+ List result = new ArrayList<>();
+ Map, T> last = new HashMap<>();
+
+ for (T item : items) {
+ Pair urnAspect = Pair.of(item.getUrn(), item.getAspectName());
+ // Check if this item is a duplicate of the previous
+ if (!last.containsKey(urnAspect) || !item.isDatabaseDuplicateOf(last.get(urnAspect))) {
+ result.add(item);
+ }
+ last.put(urnAspect, item);
+ }
+
+ return result;
+ }
+
public AspectsBatchImpl build() {
+ this.nonRepeatedItems = filterRepeats(this.items);
+
ValidationExceptionCollection exceptions =
- AspectsBatch.validateProposed(this.items, this.retrieverContext);
+ AspectsBatch.validateProposed(this.nonRepeatedItems, this.retrieverContext);
if (!exceptions.isEmpty()) {
throw new IllegalArgumentException("Failed to validate MCP due to: " + exceptions);
}
- return new AspectsBatchImpl(this.items, this.retrieverContext);
+ return new AspectsBatchImpl(this.items, this.nonRepeatedItems, this.retrieverContext);
}
}
diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ChangeItemImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ChangeItemImpl.java
index 6f45a36d1daf46..64263859e4aadb 100644
--- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ChangeItemImpl.java
+++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ChangeItemImpl.java
@@ -3,11 +3,13 @@
import com.datahub.util.exception.ModelConversionException;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
+import com.linkedin.data.template.DataTemplateUtil;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.StringMap;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
import com.linkedin.metadata.aspect.SystemAspect;
+import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.patch.template.common.GenericPatchTemplate;
@@ -269,6 +271,11 @@ private static RecordTemplate convertToRecordTemplate(
}
}
+ @Override
+ public boolean isDatabaseDuplicateOf(BatchItem other) {
+ return equals(other);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -280,13 +287,15 @@ public boolean equals(Object o) {
ChangeItemImpl that = (ChangeItemImpl) o;
return urn.equals(that.urn)
&& aspectName.equals(that.aspectName)
+ && changeType.equals(that.changeType)
&& Objects.equals(systemMetadata, that.systemMetadata)
- && recordTemplate.equals(that.recordTemplate);
+ && Objects.equals(auditStamp, that.auditStamp)
+ && DataTemplateUtil.areEqual(recordTemplate, that.recordTemplate);
}
@Override
public int hashCode() {
- return Objects.hash(urn, aspectName, systemMetadata, recordTemplate);
+ return Objects.hash(urn, aspectName, changeType, systemMetadata, auditStamp, recordTemplate);
}
@Override
diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/DeleteItemImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/DeleteItemImpl.java
index 9c1ded284fa0bd..40bcb0fa8ed2d1 100644
--- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/DeleteItemImpl.java
+++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/DeleteItemImpl.java
@@ -6,6 +6,7 @@
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
import com.linkedin.metadata.aspect.SystemAspect;
+import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.entity.EntityApiUtils;
import com.linkedin.metadata.entity.EntityAspect;
@@ -115,6 +116,11 @@ public DeleteItemImpl build(AspectRetriever aspectRetriever) {
}
}
+ @Override
+ public boolean isDatabaseDuplicateOf(BatchItem other) {
+ return equals(other);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCLItemImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCLItemImpl.java
index a5afd4651ed2c4..85923a28a64be5 100644
--- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCLItemImpl.java
+++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCLItemImpl.java
@@ -5,6 +5,7 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
+import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.MCLItem;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.entity.AspectUtils;
@@ -158,6 +159,11 @@ private static Pair convertToRecordTemplate(
}
}
+ @Override
+ public boolean isDatabaseDuplicateOf(BatchItem other) {
+ return equals(other);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/PatchItemImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/PatchItemImpl.java
index ec0a8422e3c4a2..2543d99ac6af37 100644
--- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/PatchItemImpl.java
+++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/PatchItemImpl.java
@@ -14,6 +14,7 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
+import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.batch.PatchMCP;
import com.linkedin.metadata.aspect.patch.template.AspectTemplateEngine;
@@ -216,6 +217,11 @@ public static JsonPatch convertToJsonPatch(MetadataChangeProposal mcp) {
}
}
+ @Override
+ public boolean isDatabaseDuplicateOf(BatchItem other) {
+ return equals(other);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -228,12 +234,13 @@ public boolean equals(Object o) {
return urn.equals(that.urn)
&& aspectName.equals(that.aspectName)
&& Objects.equals(systemMetadata, that.systemMetadata)
+ && auditStamp.equals(that.auditStamp)
&& patch.equals(that.patch);
}
@Override
public int hashCode() {
- return Objects.hash(urn, aspectName, systemMetadata, patch);
+ return Objects.hash(urn, aspectName, systemMetadata, auditStamp, patch);
}
@Override
diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ProposedItem.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ProposedItem.java
index 88187ef159f233..370f1f6f073e65 100644
--- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ProposedItem.java
+++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ProposedItem.java
@@ -4,6 +4,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
+import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
@@ -86,6 +87,32 @@ public ChangeType getChangeType() {
return metadataChangeProposal.getChangeType();
}
+ @Override
+ public boolean isDatabaseDuplicateOf(BatchItem other) {
+ return equals(other);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ProposedItem that = (ProposedItem) o;
+ return metadataChangeProposal.equals(that.metadataChangeProposal)
+ && auditStamp.equals(that.auditStamp);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = metadataChangeProposal.hashCode();
+ result = 31 * result + auditStamp.hashCode();
+ return result;
+ }
+
public static class ProposedItemBuilder {
public ProposedItem build() {
// Ensure systemMetadata
diff --git a/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java b/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java
index 96f535f2295aa4..9f57d36f800de3 100644
--- a/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java
+++ b/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java
@@ -6,6 +6,7 @@
import static org.testng.Assert.assertEquals;
import com.google.common.collect.ImmutableList;
+import com.linkedin.common.AuditStamp;
import com.linkedin.common.FabricType;
import com.linkedin.common.Status;
import com.linkedin.common.urn.DataPlatformUrn;
@@ -220,6 +221,7 @@ public void toUpsertBatchItemsPatchItemTest() {
@Test
public void toUpsertBatchItemsProposedItemTest() {
+ AuditStamp auditStamp = AuditStampUtils.createDefaultAuditStamp();
List testItems =
List.of(
ProposedItem.builder()
@@ -239,7 +241,7 @@ public void toUpsertBatchItemsProposedItemTest() {
ByteString.copyString(
"{\"foo\":\"bar\"}", StandardCharsets.UTF_8)))
.setSystemMetadata(new SystemMetadata()))
- .auditStamp(AuditStampUtils.createDefaultAuditStamp())
+ .auditStamp(auditStamp)
.build(),
ProposedItem.builder()
.entitySpec(testRegistry.getEntitySpec(DATASET_ENTITY_NAME))
@@ -258,7 +260,7 @@ public void toUpsertBatchItemsProposedItemTest() {
ByteString.copyString(
"{\"foo\":\"bar\"}", StandardCharsets.UTF_8)))
.setSystemMetadata(new SystemMetadata()))
- .auditStamp(AuditStampUtils.createDefaultAuditStamp())
+ .auditStamp(auditStamp)
.build());
AspectsBatchImpl testBatch =
@@ -280,7 +282,7 @@ public void toUpsertBatchItemsProposedItemTest() {
testRegistry
.getEntitySpec(DATASET_ENTITY_NAME)
.getAspectSpec(STATUS_ASPECT_NAME))
- .auditStamp(AuditStampUtils.createDefaultAuditStamp())
+ .auditStamp(auditStamp)
.systemMetadata(testItems.get(0).getSystemMetadata().setVersion("1"))
.recordTemplate(new Status().setRemoved(false))
.build(mockAspectRetriever),
@@ -295,7 +297,7 @@ public void toUpsertBatchItemsProposedItemTest() {
testRegistry
.getEntitySpec(DATASET_ENTITY_NAME)
.getAspectSpec(STATUS_ASPECT_NAME))
- .auditStamp(AuditStampUtils.createDefaultAuditStamp())
+ .auditStamp(auditStamp)
.systemMetadata(testItems.get(1).getSystemMetadata().setVersion("1"))
.recordTemplate(new Status().setRemoved(false))
.build(mockAspectRetriever))),
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java
index bf3481205fb5ab..059a6b7ed0aea3 100644
--- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java
+++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java
@@ -854,7 +854,7 @@ private List ingestAspectsToLocalDB(
boolean overwrite) {
if (inputBatch.containsDuplicateAspects()) {
- log.warn(String.format("Batch contains duplicates: %s", inputBatch));
+ log.warn("Batch contains duplicates: {}", inputBatch.duplicateAspects());
MetricUtils.counter(EntityServiceImpl.class, "batch_with_duplicate").inc();
}
@@ -968,39 +968,20 @@ private List ingestAspectsToLocalDB(
writeItem.getAspectSpec(),
databaseAspect);
- final UpdateAspectResult result;
/*
This condition is specifically for an older conditional write ingestAspectIfNotPresent()
overwrite is always true otherwise
*/
if (overwrite || databaseAspect == null) {
- result =
- Optional.ofNullable(
- ingestAspectToLocalDB(
- txContext, writeItem, databaseSystemAspect))
- .map(
- optResult ->
- optResult.toBuilder().request(writeItem).build())
- .orElse(null);
-
- } else {
- RecordTemplate oldValue = databaseSystemAspect.getRecordTemplate();
- SystemMetadata oldMetadata = databaseSystemAspect.getSystemMetadata();
- result =
- UpdateAspectResult.builder()
- .urn(writeItem.getUrn())
- .request(writeItem)
- .oldValue(oldValue)
- .newValue(oldValue)
- .oldSystemMetadata(oldMetadata)
- .newSystemMetadata(oldMetadata)
- .operation(MetadataAuditOperation.UPDATE)
- .auditStamp(writeItem.getAuditStamp())
- .maxVersion(databaseAspect.getVersion())
- .build();
+ return Optional.ofNullable(
+ ingestAspectToLocalDB(
+ txContext, writeItem, databaseSystemAspect))
+ .map(
+ optResult -> optResult.toBuilder().request(writeItem).build())
+ .orElse(null);
}
- return result;
+ return null;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
@@ -1051,7 +1032,8 @@ This condition is specifically for an older conditional write ingestAspectIfNotP
}
} else {
MetricUtils.counter(EntityServiceImpl.class, "batch_empty_transaction").inc();
- log.warn("Empty transaction detected. {}", inputBatch);
+ // This includes no-op batches. i.e. patch removing non-existent items
+ log.debug("Empty transaction detected");
}
return upsertResults;
@@ -1150,7 +1132,7 @@ public RecordTemplate ingestAspectIfNotPresent(
.build();
List ingested = ingestAspects(opContext, aspectsBatch, true, false);
- return ingested.stream().findFirst().get().getNewValue();
+ return ingested.stream().findFirst().map(UpdateAspectResult::getNewValue).orElse(null);
}
/**
@@ -2525,6 +2507,14 @@ private UpdateAspectResult ingestAspectToLocalDB(
@Nonnull final ChangeMCP writeItem,
@Nullable final EntityAspect.EntitySystemAspect databaseAspect) {
+ if (writeItem.getRecordTemplate() == null) {
+ log.error(
+ "Unexpected write of null aspect with name {}, urn {}",
+ writeItem.getAspectName(),
+ writeItem.getUrn());
+ return null;
+ }
+
// Set the "last run id" to be the run id provided with the new system metadata. This will be
// stored in index
// for all aspects that have a run id, regardless of whether they change.
@@ -2533,9 +2523,6 @@ private UpdateAspectResult ingestAspectToLocalDB(
.setLastRunId(writeItem.getSystemMetadata().getRunId(GetMode.NULL), SetMode.IGNORE_NULL);
// 2. Compare the latest existing and new.
- final RecordTemplate databaseValue =
- databaseAspect == null ? null : databaseAspect.getRecordTemplate();
-
final EntityAspect.EntitySystemAspect previousBatchAspect =
(EntityAspect.EntitySystemAspect) writeItem.getPreviousSystemAspect();
final RecordTemplate previousValue =
@@ -2544,45 +2531,86 @@ private UpdateAspectResult ingestAspectToLocalDB(
// 3. If there is no difference between existing and new, we just update
// the lastObserved in system metadata. RunId should stay as the original runId
if (previousValue != null
- && DataTemplateUtil.areEqual(databaseValue, writeItem.getRecordTemplate())) {
+ && DataTemplateUtil.areEqual(previousValue, writeItem.getRecordTemplate())) {
- SystemMetadata latestSystemMetadata = previousBatchAspect.getSystemMetadata();
- latestSystemMetadata.setLastObserved(writeItem.getSystemMetadata().getLastObserved());
- latestSystemMetadata.setLastRunId(
- writeItem.getSystemMetadata().getLastRunId(GetMode.NULL), SetMode.IGNORE_NULL);
-
- previousBatchAspect
- .getEntityAspect()
- .setSystemMetadata(RecordUtils.toJsonString(latestSystemMetadata));
-
- log.info(
- "Ingesting aspect with name {}, urn {}",
- previousBatchAspect.getAspectName(),
- previousBatchAspect.getUrn());
- aspectDao.saveAspect(txContext, previousBatchAspect.getEntityAspect(), false);
-
- // metrics
- aspectDao.incrementWriteMetrics(
- previousBatchAspect.getAspectName(),
- 1,
- previousBatchAspect.getMetadataRaw().getBytes(StandardCharsets.UTF_8).length);
+ Optional latestSystemMetadataDiff =
+ systemMetadataDiff(
+ txContext,
+ previousBatchAspect.getSystemMetadata(),
+ writeItem.getSystemMetadata(),
+ databaseAspect == null ? null : databaseAspect.getSystemMetadata());
+
+ if (latestSystemMetadataDiff.isPresent()) {
+ // Update previous version since that is what is re-written
+ previousBatchAspect
+ .getEntityAspect()
+ .setSystemMetadata(RecordUtils.toJsonString(latestSystemMetadataDiff.get()));
+
+ // Inserts & update order is not guaranteed, flush the insert for potential updates within
+ // same tx
+ if (databaseAspect == null && txContext != null) {
+ conditionalLogLevel(
+ txContext,
+ String.format(
+ "Flushing for systemMetadata update aspect with name %s, urn %s",
+ writeItem.getAspectName(), writeItem.getUrn()));
+ txContext.flush();
+ }
- return UpdateAspectResult.builder()
- .urn(writeItem.getUrn())
- .oldValue(previousValue)
- .newValue(previousValue)
- .oldSystemMetadata(previousBatchAspect.getSystemMetadata())
- .newSystemMetadata(latestSystemMetadata)
- .operation(MetadataAuditOperation.UPDATE)
- .auditStamp(writeItem.getAuditStamp())
- .maxVersion(0)
- .build();
+ conditionalLogLevel(
+ txContext,
+ String.format(
+ "Update aspect with name %s, urn %s, txContext: %s, databaseAspect: %s, newAspect: %s",
+ previousBatchAspect.getAspectName(),
+ previousBatchAspect.getUrn(),
+ txContext != null,
+ databaseAspect == null ? null : databaseAspect.getEntityAspect(),
+ previousBatchAspect.getEntityAspect()));
+ aspectDao.saveAspect(txContext, previousBatchAspect.getEntityAspect(), false);
+
+ // metrics
+ aspectDao.incrementWriteMetrics(
+ previousBatchAspect.getAspectName(),
+ 1,
+ previousBatchAspect.getMetadataRaw().getBytes(StandardCharsets.UTF_8).length);
+
+ return UpdateAspectResult.builder()
+ .urn(writeItem.getUrn())
+ .oldValue(previousValue)
+ .newValue(previousValue)
+ .oldSystemMetadata(previousBatchAspect.getSystemMetadata())
+ .newSystemMetadata(latestSystemMetadataDiff.get())
+ .operation(MetadataAuditOperation.UPDATE)
+ .auditStamp(writeItem.getAuditStamp())
+ .maxVersion(0)
+ .build();
+ } else {
+ MetricUtils.counter(EntityServiceImpl.class, "batch_with_noop_sysmetadata").inc();
+ return null;
+ }
}
// 4. Save the newValue as the latest version
- if (!DataTemplateUtil.areEqual(databaseValue, writeItem.getRecordTemplate())) {
- log.debug(
- "Ingesting aspect with name {}, urn {}", writeItem.getAspectName(), writeItem.getUrn());
+ if (writeItem.getRecordTemplate() != null
+ && !DataTemplateUtil.areEqual(previousValue, writeItem.getRecordTemplate())) {
+ conditionalLogLevel(
+ txContext,
+ String.format(
+ "Insert aspect with name %s, urn %s", writeItem.getAspectName(), writeItem.getUrn()));
+
+ // Inserts & update order is not guaranteed, flush the insert for potential updates within
+ // same tx
+ if (databaseAspect == null
+ && !ASPECT_LATEST_VERSION.equals(writeItem.getNextAspectVersion())
+ && txContext != null) {
+ conditionalLogLevel(
+ txContext,
+ String.format(
+ "Flushing for update aspect with name %s, urn %s",
+ writeItem.getAspectName(), writeItem.getUrn()));
+ txContext.flush();
+ }
+
String newValueStr = EntityApiUtils.toJsonAspect(writeItem.getRecordTemplate());
long versionOfOld =
aspectDao.saveLatestAspect(
@@ -2630,4 +2658,41 @@ private static boolean shouldAspectEmitChangeLog(@Nonnull final AspectSpec aspec
aspectSpec.getRelationshipFieldSpecs();
return relationshipFieldSpecs.stream().anyMatch(RelationshipFieldSpec::isLineageRelationship);
}
+
+ private static Optional systemMetadataDiff(
+ @Nullable TransactionContext txContext,
+ @Nullable SystemMetadata previous,
+ @Nonnull SystemMetadata current,
+ @Nullable SystemMetadata database) {
+
+ SystemMetadata latestSystemMetadata = GenericRecordUtils.copy(previous, SystemMetadata.class);
+
+ latestSystemMetadata.setLastRunId(previous.getRunId(), SetMode.REMOVE_IF_NULL);
+ latestSystemMetadata.setLastObserved(current.getLastObserved(), SetMode.IGNORE_NULL);
+ latestSystemMetadata.setRunId(current.getRunId(), SetMode.REMOVE_IF_NULL);
+
+ if (!DataTemplateUtil.areEqual(latestSystemMetadata, previous)
+ && !DataTemplateUtil.areEqual(latestSystemMetadata, database)) {
+
+ conditionalLogLevel(
+ txContext,
+ String.format(
+ "systemMetdataDiff: %s != %s AND %s",
+ RecordUtils.toJsonString(latestSystemMetadata),
+ previous == null ? null : RecordUtils.toJsonString(previous),
+ database == null ? null : RecordUtils.toJsonString(database)));
+
+ return Optional.of(latestSystemMetadata);
+ }
+
+ return Optional.empty();
+ }
+
+ private static void conditionalLogLevel(@Nullable TransactionContext txContext, String message) {
+ if (txContext != null && txContext.getFailedAttempts() > 1) {
+ log.warn(message);
+ } else {
+ log.debug(message);
+ }
+ }
}
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/TransactionContext.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/TransactionContext.java
index 69f2f1c8981c03..6897c9152e9a25 100644
--- a/metadata-io/src/main/java/com/linkedin/metadata/entity/TransactionContext.java
+++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/TransactionContext.java
@@ -66,4 +66,10 @@ public void commitAndContinue() {
}
success();
}
+
+ public void flush() {
+ if (tx != null) {
+ tx.flush();
+ }
+ }
}
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java
index 9e7387947a9547..a00482acda62e2 100644
--- a/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java
+++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java
@@ -590,7 +590,7 @@ public long saveLatestAspect(
// Save oldValue as the largest version + 1
long largestVersion = ASPECT_LATEST_VERSION;
BatchStatement batch = BatchStatement.newInstance(BatchType.UNLOGGED);
- if (oldAspectMetadata != null && oldTime != null) {
+ if (!ASPECT_LATEST_VERSION.equals(nextVersion) && oldTime != null) {
largestVersion = nextVersion;
final EntityAspect aspect =
new EntityAspect(
@@ -616,7 +616,7 @@ public long saveLatestAspect(
newTime,
newActor,
newImpersonator);
- batch = batch.add(generateSaveStatement(aspect, oldAspectMetadata == null));
+ batch = batch.add(generateSaveStatement(aspect, ASPECT_LATEST_VERSION.equals(nextVersion)));
_cqlSession.execute(batch);
return largestVersion;
}
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java
index 6233bf5e0e35cf..729d0e61cb2c00 100644
--- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java
+++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java
@@ -165,7 +165,7 @@ public long saveLatestAspect(
}
// Save oldValue as the largest version + 1
long largestVersion = ASPECT_LATEST_VERSION;
- if (oldAspectMetadata != null && oldTime != null) {
+ if (!ASPECT_LATEST_VERSION.equals(nextVersion) && oldTime != null) {
largestVersion = nextVersion;
saveAspect(
txContext,
@@ -191,7 +191,7 @@ public long saveLatestAspect(
newTime,
newSystemMetadata,
ASPECT_LATEST_VERSION,
- oldAspectMetadata == null);
+ ASPECT_LATEST_VERSION.equals(nextVersion));
return largestVersion;
}
diff --git a/metadata-io/src/test/java/com/linkedin/metadata/AspectGenerationUtils.java b/metadata-io/src/test/java/com/linkedin/metadata/AspectGenerationUtils.java
index 346a1eef845923..395c040f288111 100644
--- a/metadata-io/src/test/java/com/linkedin/metadata/AspectGenerationUtils.java
+++ b/metadata-io/src/test/java/com/linkedin/metadata/AspectGenerationUtils.java
@@ -34,19 +34,19 @@ public static SystemMetadata createSystemMetadata() {
}
@Nonnull
- public static SystemMetadata createSystemMetadata(long nextAspectVersion) {
+ public static SystemMetadata createSystemMetadata(int nextAspectVersion) {
return createSystemMetadata(
1625792689, "run-123", "run-123", String.valueOf(nextAspectVersion));
}
@Nonnull
- public static SystemMetadata createSystemMetadata(long lastObserved, @Nonnull String runId) {
+ public static SystemMetadata createSystemMetadata(int lastObserved, @Nonnull String runId) {
return createSystemMetadata(lastObserved, runId, runId, null);
}
@Nonnull
public static SystemMetadata createSystemMetadata(
- long lastObserved,
+ int lastObserved, // for test comparison must be int
@Nonnull String runId,
@Nonnull String lastRunId,
@Nullable String version) {
diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java
index a1000fd02abfe1..aa42545fa0e46f 100644
--- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java
+++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java
@@ -1,10 +1,6 @@
package com.linkedin.metadata.entity;
-import static com.linkedin.metadata.Constants.APP_SOURCE;
import static com.linkedin.metadata.Constants.CORP_USER_ENTITY_NAME;
-import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME;
-import static com.linkedin.metadata.Constants.GLOBAL_TAGS_ASPECT_NAME;
-import static com.linkedin.metadata.Constants.METADATA_TESTS_SOURCE;
import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
@@ -12,36 +8,27 @@
import static org.testng.Assert.assertTrue;
import com.linkedin.common.AuditStamp;
-import com.linkedin.common.GlobalTags;
import com.linkedin.common.Status;
-import com.linkedin.common.TagAssociation;
-import com.linkedin.common.TagAssociationArray;
-import com.linkedin.common.urn.TagUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.DataTemplateUtil;
import com.linkedin.data.template.RecordTemplate;
-import com.linkedin.data.template.StringMap;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.identity.CorpUserInfo;
import com.linkedin.metadata.AspectGenerationUtils;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.EbeanTestUtils;
-import com.linkedin.metadata.aspect.patch.GenericJsonPatch;
-import com.linkedin.metadata.aspect.patch.PatchOperationType;
import com.linkedin.metadata.config.EbeanConfiguration;
import com.linkedin.metadata.config.PreProcessHooks;
import com.linkedin.metadata.entity.ebean.EbeanAspectDao;
import com.linkedin.metadata.entity.ebean.EbeanRetentionService;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
-import com.linkedin.metadata.entity.ebean.batch.PatchItemImpl;
import com.linkedin.metadata.event.EventProducer;
import com.linkedin.metadata.key.CorpUserKey;
import com.linkedin.metadata.models.registry.EntityRegistryException;
import com.linkedin.metadata.query.ListUrnsResult;
import com.linkedin.metadata.service.UpdateIndicesService;
-import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.metadata.utils.PegasusUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata;
@@ -64,7 +51,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Triple;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
@@ -396,360 +382,6 @@ public void testSystemMetadataDuplicateKey() throws Exception {
"Expected version 0 with systemMeta version 3 accounting for the the collision");
}
- @Test
- public void testBatchDuplicate() throws Exception {
- Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:batchDuplicateTest");
- SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();
- ChangeItemImpl item1 =
- ChangeItemImpl.builder()
- .urn(entityUrn)
- .aspectName(STATUS_ASPECT_NAME)
- .recordTemplate(new Status().setRemoved(true))
- .systemMetadata(systemMetadata.copy())
- .auditStamp(TEST_AUDIT_STAMP)
- .build(TestOperationContexts.emptyAspectRetriever(null));
- ChangeItemImpl item2 =
- ChangeItemImpl.builder()
- .urn(entityUrn)
- .aspectName(STATUS_ASPECT_NAME)
- .recordTemplate(new Status().setRemoved(false))
- .systemMetadata(systemMetadata.copy())
- .auditStamp(TEST_AUDIT_STAMP)
- .build(TestOperationContexts.emptyAspectRetriever(null));
- _entityServiceImpl.ingestAspects(
- opContext,
- AspectsBatchImpl.builder()
- .retrieverContext(opContext.getRetrieverContext().get())
- .items(List.of(item1, item2))
- .build(),
- false,
- true);
-
- // List aspects urns
- ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 2);
-
- assertEquals(batch.getStart().intValue(), 0);
- assertEquals(batch.getCount().intValue(), 1);
- assertEquals(batch.getTotal().intValue(), 1);
- assertEquals(batch.getEntities().size(), 1);
- assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());
-
- EnvelopedAspect envelopedAspect =
- _entityServiceImpl.getLatestEnvelopedAspect(
- opContext, CORP_USER_ENTITY_NAME, entityUrn, STATUS_ASPECT_NAME);
- assertEquals(
- envelopedAspect.getSystemMetadata().getVersion(),
- "2",
- "Expected version 2 accounting for duplicates");
- assertEquals(
- envelopedAspect.getValue().toString(),
- "{removed=false}",
- "Expected 2nd item to be the latest");
- }
-
- @Test
- public void testBatchPatchWithTrailingNoOp() throws Exception {
- Urn entityUrn =
- UrnUtils.getUrn(
- "urn:li:dataset:(urn:li:dataPlatform:snowflake,testBatchPatchWithTrailingNoOp,PROD)");
- TagUrn tag1 = TagUrn.createFromString("urn:li:tag:tag1");
- Urn tag2 = UrnUtils.getUrn("urn:li:tag:tag2");
- Urn tagOther = UrnUtils.getUrn("urn:li:tag:other");
-
- SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();
-
- ChangeItemImpl initialAspectTag1 =
- ChangeItemImpl.builder()
- .urn(entityUrn)
- .aspectName(GLOBAL_TAGS_ASPECT_NAME)
- .recordTemplate(
- new GlobalTags()
- .setTags(new TagAssociationArray(new TagAssociation().setTag(tag1))))
- .systemMetadata(systemMetadata.copy())
- .auditStamp(TEST_AUDIT_STAMP)
- .build(TestOperationContexts.emptyAspectRetriever(null));
-
- PatchItemImpl patchAdd2 =
- PatchItemImpl.builder()
- .urn(entityUrn)
- .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
- .aspectName(GLOBAL_TAGS_ASPECT_NAME)
- .aspectSpec(
- _testEntityRegistry
- .getEntitySpec(DATASET_ENTITY_NAME)
- .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
- .patch(
- GenericJsonPatch.builder()
- .arrayPrimaryKeys(Map.of("properties", List.of("tag")))
- .patch(List.of(tagPatchOp(PatchOperationType.ADD, tag2)))
- .build()
- .getJsonPatch())
- .auditStamp(AuditStampUtils.createDefaultAuditStamp())
- .build(_testEntityRegistry);
-
- PatchItemImpl patchRemoveNonExistent =
- PatchItemImpl.builder()
- .urn(entityUrn)
- .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
- .aspectName(GLOBAL_TAGS_ASPECT_NAME)
- .aspectSpec(
- _testEntityRegistry
- .getEntitySpec(DATASET_ENTITY_NAME)
- .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
- .patch(
- GenericJsonPatch.builder()
- .arrayPrimaryKeys(Map.of("properties", List.of("tag")))
- .patch(List.of(tagPatchOp(PatchOperationType.REMOVE, tagOther)))
- .build()
- .getJsonPatch())
- .auditStamp(AuditStampUtils.createDefaultAuditStamp())
- .build(_testEntityRegistry);
-
- // establish base entity
- _entityServiceImpl.ingestAspects(
- opContext,
- AspectsBatchImpl.builder()
- .retrieverContext(opContext.getRetrieverContext().get())
- .items(List.of(initialAspectTag1))
- .build(),
- false,
- true);
-
- _entityServiceImpl.ingestAspects(
- opContext,
- AspectsBatchImpl.builder()
- .retrieverContext(opContext.getRetrieverContext().get())
- .items(List.of(patchAdd2, patchRemoveNonExistent))
- .build(),
- false,
- true);
-
- // List aspects urns
- ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1);
-
- assertEquals(batch.getStart().intValue(), 0);
- assertEquals(batch.getCount().intValue(), 1);
- assertEquals(batch.getTotal().intValue(), 1);
- assertEquals(batch.getEntities().size(), 1);
- assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());
-
- EnvelopedAspect envelopedAspect =
- _entityServiceImpl.getLatestEnvelopedAspect(
- opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
- assertEquals(
- envelopedAspect.getSystemMetadata().getVersion(),
- "3",
- "Expected version 3. 1 - Initial, + 1 add, 1 remove");
- assertEquals(
- new GlobalTags(envelopedAspect.getValue().data())
- .getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()),
- Set.of(tag1, tag2),
- "Expected both tags");
- }
-
- @Test
- public void testBatchPatchAdd() throws Exception {
- Urn entityUrn =
- UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:snowflake,testBatchPatchAdd,PROD)");
- TagUrn tag1 = TagUrn.createFromString("urn:li:tag:tag1");
- TagUrn tag2 = TagUrn.createFromString("urn:li:tag:tag2");
- TagUrn tag3 = TagUrn.createFromString("urn:li:tag:tag3");
-
- SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();
-
- ChangeItemImpl initialAspectTag1 =
- ChangeItemImpl.builder()
- .urn(entityUrn)
- .aspectName(GLOBAL_TAGS_ASPECT_NAME)
- .recordTemplate(
- new GlobalTags()
- .setTags(new TagAssociationArray(new TagAssociation().setTag(tag1))))
- .systemMetadata(systemMetadata.copy())
- .auditStamp(TEST_AUDIT_STAMP)
- .build(TestOperationContexts.emptyAspectRetriever(null));
-
- PatchItemImpl patchAdd3 =
- PatchItemImpl.builder()
- .urn(entityUrn)
- .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
- .aspectName(GLOBAL_TAGS_ASPECT_NAME)
- .aspectSpec(
- _testEntityRegistry
- .getEntitySpec(DATASET_ENTITY_NAME)
- .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
- .patch(
- GenericJsonPatch.builder()
- .arrayPrimaryKeys(Map.of("properties", List.of("tag")))
- .patch(List.of(tagPatchOp(PatchOperationType.ADD, tag3)))
- .build()
- .getJsonPatch())
- .auditStamp(AuditStampUtils.createDefaultAuditStamp())
- .build(_testEntityRegistry);
-
- PatchItemImpl patchAdd2 =
- PatchItemImpl.builder()
- .urn(entityUrn)
- .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
- .aspectName(GLOBAL_TAGS_ASPECT_NAME)
- .aspectSpec(
- _testEntityRegistry
- .getEntitySpec(DATASET_ENTITY_NAME)
- .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
- .patch(
- GenericJsonPatch.builder()
- .arrayPrimaryKeys(Map.of("properties", List.of("tag")))
- .patch(List.of(tagPatchOp(PatchOperationType.ADD, tag2)))
- .build()
- .getJsonPatch())
- .auditStamp(AuditStampUtils.createDefaultAuditStamp())
- .build(_testEntityRegistry);
-
- PatchItemImpl patchAdd1 =
- PatchItemImpl.builder()
- .urn(entityUrn)
- .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
- .aspectName(GLOBAL_TAGS_ASPECT_NAME)
- .aspectSpec(
- _testEntityRegistry
- .getEntitySpec(DATASET_ENTITY_NAME)
- .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
- .patch(
- GenericJsonPatch.builder()
- .arrayPrimaryKeys(Map.of("properties", List.of("tag")))
- .patch(List.of(tagPatchOp(PatchOperationType.ADD, tag1)))
- .build()
- .getJsonPatch())
- .auditStamp(AuditStampUtils.createDefaultAuditStamp())
- .build(_testEntityRegistry);
-
- // establish base entity
- _entityServiceImpl.ingestAspects(
- opContext,
- AspectsBatchImpl.builder()
- .retrieverContext(opContext.getRetrieverContext().get())
- .items(List.of(initialAspectTag1))
- .build(),
- false,
- true);
-
- _entityServiceImpl.ingestAspects(
- opContext,
- AspectsBatchImpl.builder()
- .retrieverContext(opContext.getRetrieverContext().get())
- .items(List.of(patchAdd3, patchAdd2, patchAdd1))
- .build(),
- false,
- true);
-
- // List aspects urns
- ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1);
-
- assertEquals(batch.getStart().intValue(), 0);
- assertEquals(batch.getCount().intValue(), 1);
- assertEquals(batch.getTotal().intValue(), 1);
- assertEquals(batch.getEntities().size(), 1);
- assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());
-
- EnvelopedAspect envelopedAspect =
- _entityServiceImpl.getLatestEnvelopedAspect(
- opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
- assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "4", "Expected version 4");
- assertEquals(
- new GlobalTags(envelopedAspect.getValue().data())
- .getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()),
- Set.of(tag1, tag2, tag3),
- "Expected all tags");
- }
-
- @Test
- public void testBatchPatchAddDuplicate() throws Exception {
- Urn entityUrn =
- UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:snowflake,testBatchPatchAdd,PROD)");
- List initialTags =
- List.of(
- TagUrn.createFromString("urn:li:tag:__default_large_table"),
- TagUrn.createFromString("urn:li:tag:__default_low_queries"),
- TagUrn.createFromString("urn:li:tag:__default_low_changes"),
- TagUrn.createFromString("urn:li:tag:!10TB+ tables"))
- .stream()
- .map(tag -> new TagAssociation().setTag(tag))
- .collect(Collectors.toList());
- TagUrn tag2 = TagUrn.createFromString("urn:li:tag:$ 1TB+");
-
- SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();
-
- SystemMetadata patchSystemMetadata = new SystemMetadata();
- patchSystemMetadata.setLastObserved(systemMetadata.getLastObserved() + 1);
- patchSystemMetadata.setProperties(new StringMap(Map.of(APP_SOURCE, METADATA_TESTS_SOURCE)));
-
- ChangeItemImpl initialAspectTag1 =
- ChangeItemImpl.builder()
- .urn(entityUrn)
- .aspectName(GLOBAL_TAGS_ASPECT_NAME)
- .recordTemplate(new GlobalTags().setTags(new TagAssociationArray(initialTags)))
- .systemMetadata(systemMetadata.copy())
- .auditStamp(TEST_AUDIT_STAMP)
- .build(TestOperationContexts.emptyAspectRetriever(null));
-
- PatchItemImpl patchAdd2 =
- PatchItemImpl.builder()
- .urn(entityUrn)
- .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
- .aspectName(GLOBAL_TAGS_ASPECT_NAME)
- .aspectSpec(
- _testEntityRegistry
- .getEntitySpec(DATASET_ENTITY_NAME)
- .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
- .patch(
- GenericJsonPatch.builder()
- .arrayPrimaryKeys(Map.of("properties", List.of("tag")))
- .patch(List.of(tagPatchOp(PatchOperationType.ADD, tag2)))
- .build()
- .getJsonPatch())
- .systemMetadata(patchSystemMetadata)
- .auditStamp(AuditStampUtils.createDefaultAuditStamp())
- .build(_testEntityRegistry);
-
- // establish base entity
- _entityServiceImpl.ingestAspects(
- opContext,
- AspectsBatchImpl.builder()
- .retrieverContext(opContext.getRetrieverContext().get())
- .items(List.of(initialAspectTag1))
- .build(),
- false,
- true);
-
- _entityServiceImpl.ingestAspects(
- opContext,
- AspectsBatchImpl.builder()
- .retrieverContext(opContext.getRetrieverContext().get())
- .items(List.of(patchAdd2, patchAdd2)) // duplicate
- .build(),
- false,
- true);
-
- // List aspects urns
- ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1);
-
- assertEquals(batch.getStart().intValue(), 0);
- assertEquals(batch.getCount().intValue(), 1);
- assertEquals(batch.getTotal().intValue(), 1);
- assertEquals(batch.getEntities().size(), 1);
- assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());
-
- EnvelopedAspect envelopedAspect =
- _entityServiceImpl.getLatestEnvelopedAspect(
- opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
- assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "3", "Expected version 3");
- assertEquals(
- new GlobalTags(envelopedAspect.getValue().data())
- .getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()),
- Stream.concat(initialTags.stream().map(TagAssociation::getTag), Stream.of(tag2))
- .collect(Collectors.toSet()),
- "Expected all tags");
- }
-
@Test
public void dataGeneratorThreadingTest() {
DataGenerator dataGenerator = new DataGenerator(opContext, _entityServiceImpl);
@@ -976,14 +608,4 @@ public void run() {
}
}
}
-
- private static GenericJsonPatch.PatchOp tagPatchOp(PatchOperationType op, Urn tagUrn) {
- GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp();
- patchOp.setOp(op.getValue());
- patchOp.setPath(String.format("/tags/%s", tagUrn));
- if (PatchOperationType.ADD.equals(op)) {
- patchOp.setValue(Map.of("tag", tagUrn.toString()));
- }
- return patchOp;
- }
}
diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java
index 654c448fdec946..18d277cacbbe26 100644
--- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java
+++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java
@@ -11,14 +11,18 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.AuditStamp;
+import com.linkedin.common.GlobalTags;
import com.linkedin.common.Owner;
import com.linkedin.common.OwnerArray;
import com.linkedin.common.Ownership;
import com.linkedin.common.OwnershipType;
import com.linkedin.common.Status;
+import com.linkedin.common.TagAssociation;
+import com.linkedin.common.TagAssociationArray;
import com.linkedin.common.UrnArray;
import com.linkedin.common.VersionedUrn;
import com.linkedin.common.urn.CorpuserUrn;
+import com.linkedin.common.urn.TagUrn;
import com.linkedin.common.urn.TupleKey;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
@@ -42,8 +46,11 @@
import com.linkedin.metadata.aspect.CorpUserAspect;
import com.linkedin.metadata.aspect.CorpUserAspectArray;
import com.linkedin.metadata.aspect.VersionedAspect;
+import com.linkedin.metadata.aspect.patch.GenericJsonPatch;
+import com.linkedin.metadata.aspect.patch.PatchOperationType;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
+import com.linkedin.metadata.entity.ebean.batch.PatchItemImpl;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.entity.validation.ValidationApiUtils;
import com.linkedin.metadata.entity.validation.ValidationException;
@@ -52,10 +59,12 @@
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistryException;
+import com.linkedin.metadata.query.ListUrnsResult;
import com.linkedin.metadata.run.AspectRowSummary;
import com.linkedin.metadata.service.UpdateIndicesService;
import com.linkedin.metadata.snapshot.CorpUserSnapshot;
import com.linkedin.metadata.snapshot.Snapshot;
+import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.GenericAspect;
@@ -605,6 +614,9 @@ public void testReingestLineageAspect() throws Exception {
entityUrn,
_testEntityRegistry.getEntitySpec(entityUrn.getEntityType()).getKeyAspectSpec())));
+ SystemMetadata futureSystemMetadata = AspectGenerationUtils.createSystemMetadata(1);
+ futureSystemMetadata.setLastObserved(futureSystemMetadata.getLastObserved() + 1);
+
final MetadataChangeLog restateChangeLog = new MetadataChangeLog();
restateChangeLog.setEntityType(entityUrn.getEntityType());
restateChangeLog.setEntityUrn(entityUrn);
@@ -612,10 +624,10 @@ public void testReingestLineageAspect() throws Exception {
restateChangeLog.setAspectName(aspectName1);
restateChangeLog.setCreated(TEST_AUDIT_STAMP);
restateChangeLog.setAspect(aspect);
- restateChangeLog.setSystemMetadata(AspectGenerationUtils.createSystemMetadata(1));
+ restateChangeLog.setSystemMetadata(futureSystemMetadata);
restateChangeLog.setPreviousAspectValue(aspect);
restateChangeLog.setPreviousSystemMetadata(
- simulatePullFromDB(initialSystemMetadata, SystemMetadata.class));
+ simulatePullFromDB(futureSystemMetadata, SystemMetadata.class));
restateChangeLog.setEntityKeyAspect(
GenericRecordUtils.serializeAspect(
EntityKeyUtils.convertUrnToEntityKey(
@@ -636,11 +648,7 @@ public void testReingestLineageAspect() throws Exception {
clearInvocations(_mockProducer);
_entityServiceImpl.ingestAspects(
- opContext,
- entityUrn,
- pairToIngest,
- TEST_AUDIT_STAMP,
- AspectGenerationUtils.createSystemMetadata());
+ opContext, entityUrn, pairToIngest, TEST_AUDIT_STAMP, futureSystemMetadata);
verify(_mockProducer, times(1))
.produceMetadataChangeLog(
@@ -682,6 +690,12 @@ public void testReingestLineageProposal() throws Exception {
initialChangeLog.setAspect(genericAspect);
initialChangeLog.setSystemMetadata(metadata1);
+ SystemMetadata futureSystemMetadata = AspectGenerationUtils.createSystemMetadata(1);
+ futureSystemMetadata.setLastObserved(futureSystemMetadata.getLastObserved() + 1);
+
+ MetadataChangeProposal mcp2 = new MetadataChangeProposal(mcp1.data().copy());
+ mcp2.getSystemMetadata().setLastObserved(futureSystemMetadata.getLastObserved());
+
final MetadataChangeLog restateChangeLog = new MetadataChangeLog();
restateChangeLog.setEntityType(entityUrn.getEntityType());
restateChangeLog.setEntityUrn(entityUrn);
@@ -689,9 +703,10 @@ public void testReingestLineageProposal() throws Exception {
restateChangeLog.setAspectName(aspectName1);
restateChangeLog.setCreated(TEST_AUDIT_STAMP);
restateChangeLog.setAspect(genericAspect);
- restateChangeLog.setSystemMetadata(AspectGenerationUtils.createSystemMetadata(1));
+ restateChangeLog.setSystemMetadata(futureSystemMetadata);
restateChangeLog.setPreviousAspectValue(genericAspect);
- restateChangeLog.setPreviousSystemMetadata(simulatePullFromDB(metadata1, SystemMetadata.class));
+ restateChangeLog.setPreviousSystemMetadata(
+ simulatePullFromDB(futureSystemMetadata, SystemMetadata.class));
Map latestAspects =
_entityServiceImpl.getLatestAspectsForUrn(
@@ -706,7 +721,7 @@ public void testReingestLineageProposal() throws Exception {
// unless invocations are cleared
clearInvocations(_mockProducer);
- _entityServiceImpl.ingestProposal(opContext, mcp1, TEST_AUDIT_STAMP, false);
+ _entityServiceImpl.ingestProposal(opContext, mcp2, TEST_AUDIT_STAMP, false);
verify(_mockProducer, times(1))
.produceMetadataChangeLog(
@@ -1390,7 +1405,7 @@ public void testIngestSameAspect() throws AssertionError {
SystemMetadata metadata1 = AspectGenerationUtils.createSystemMetadata(1625792689, "run-123");
SystemMetadata metadata2 = AspectGenerationUtils.createSystemMetadata(1635792689, "run-456");
SystemMetadata metadata3 =
- AspectGenerationUtils.createSystemMetadata(1635792689, "run-123", "run-456", "1");
+ AspectGenerationUtils.createSystemMetadata(1635792689, "run-456", "run-123", "1");
List items =
List.of(
@@ -1482,6 +1497,9 @@ public void testIngestSameAspect() throws AssertionError {
assertTrue(
DataTemplateUtil.areEqual(
+ EntityApiUtils.parseSystemMetadata(readAspectDao2.getSystemMetadata()), metadata3),
+ String.format(
+ "Expected %s == %s",
EntityApiUtils.parseSystemMetadata(readAspectDao2.getSystemMetadata()), metadata3));
verify(_mockProducer, times(0))
@@ -2179,6 +2197,474 @@ public void testExists() throws Exception {
Set.of(existentUrn, noStatusUrn, softDeletedUrn));
}
+ @Test
+ public void testBatchDuplicate() throws Exception {
+ Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:batchDuplicateTest");
+ SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();
+ ChangeItemImpl item1 =
+ ChangeItemImpl.builder()
+ .urn(entityUrn)
+ .aspectName(STATUS_ASPECT_NAME)
+ .recordTemplate(new Status().setRemoved(true))
+ .systemMetadata(systemMetadata.copy())
+ .auditStamp(TEST_AUDIT_STAMP)
+ .build(TestOperationContexts.emptyAspectRetriever(null));
+ ChangeItemImpl item2 =
+ ChangeItemImpl.builder()
+ .urn(entityUrn)
+ .aspectName(STATUS_ASPECT_NAME)
+ .recordTemplate(new Status().setRemoved(false))
+ .systemMetadata(systemMetadata.copy())
+ .auditStamp(TEST_AUDIT_STAMP)
+ .build(TestOperationContexts.emptyAspectRetriever(null));
+ _entityServiceImpl.ingestAspects(
+ opContext,
+ AspectsBatchImpl.builder()
+ .retrieverContext(opContext.getRetrieverContext().get())
+ .items(List.of(item1, item2))
+ .build(),
+ false,
+ true);
+
+ // List aspects urns
+ ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 2);
+
+ assertEquals(batch.getStart().intValue(), 0);
+ assertEquals(batch.getCount().intValue(), 1);
+ assertEquals(batch.getTotal().intValue(), 1);
+ assertEquals(batch.getEntities().size(), 1);
+ assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());
+
+ EnvelopedAspect envelopedAspect =
+ _entityServiceImpl.getLatestEnvelopedAspect(
+ opContext, CORP_USER_ENTITY_NAME, entityUrn, STATUS_ASPECT_NAME);
+ assertEquals(
+ envelopedAspect.getSystemMetadata().getVersion(),
+ "2",
+ "Expected version 2 after accounting for sequential duplicates");
+ assertEquals(
+ envelopedAspect.getValue().toString(),
+ "{removed=false}",
+ "Expected 2nd item to be the latest");
+ }
+
+ @Test
+ public void testBatchPatchWithTrailingNoOp() throws Exception {
+ Urn entityUrn =
+ UrnUtils.getUrn(
+ "urn:li:dataset:(urn:li:dataPlatform:snowflake,testBatchPatchWithTrailingNoOp,PROD)");
+ TagUrn tag1 = TagUrn.createFromString("urn:li:tag:tag1");
+ Urn tag2 = UrnUtils.getUrn("urn:li:tag:tag2");
+ Urn tagOther = UrnUtils.getUrn("urn:li:tag:other");
+
+ SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();
+
+ ChangeItemImpl initialAspectTag1 =
+ ChangeItemImpl.builder()
+ .urn(entityUrn)
+ .aspectName(GLOBAL_TAGS_ASPECT_NAME)
+ .recordTemplate(
+ new GlobalTags()
+ .setTags(new TagAssociationArray(new TagAssociation().setTag(tag1))))
+ .systemMetadata(systemMetadata.copy())
+ .auditStamp(TEST_AUDIT_STAMP)
+ .build(TestOperationContexts.emptyAspectRetriever(null));
+
+ PatchItemImpl patchAdd2 =
+ PatchItemImpl.builder()
+ .urn(entityUrn)
+ .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
+ .aspectName(GLOBAL_TAGS_ASPECT_NAME)
+ .aspectSpec(
+ _testEntityRegistry
+ .getEntitySpec(DATASET_ENTITY_NAME)
+ .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
+ .patch(
+ GenericJsonPatch.builder()
+ .arrayPrimaryKeys(Map.of("properties", List.of("tag")))
+ .patch(List.of(tagPatchOp(PatchOperationType.ADD, tag2)))
+ .build()
+ .getJsonPatch())
+ .auditStamp(AuditStampUtils.createDefaultAuditStamp())
+ .build(_testEntityRegistry);
+
+ PatchItemImpl patchRemoveNonExistent =
+ PatchItemImpl.builder()
+ .urn(entityUrn)
+ .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
+ .aspectName(GLOBAL_TAGS_ASPECT_NAME)
+ .aspectSpec(
+ _testEntityRegistry
+ .getEntitySpec(DATASET_ENTITY_NAME)
+ .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
+ .patch(
+ GenericJsonPatch.builder()
+ .arrayPrimaryKeys(Map.of("properties", List.of("tag")))
+ .patch(List.of(tagPatchOp(PatchOperationType.REMOVE, tagOther)))
+ .build()
+ .getJsonPatch())
+ .auditStamp(AuditStampUtils.createDefaultAuditStamp())
+ .build(_testEntityRegistry);
+
+ // establish base entity
+ _entityServiceImpl.ingestAspects(
+ opContext,
+ AspectsBatchImpl.builder()
+ .retrieverContext(opContext.getRetrieverContext().get())
+ .items(List.of(initialAspectTag1))
+ .build(),
+ false,
+ true);
+
+ _entityServiceImpl.ingestAspects(
+ opContext,
+ AspectsBatchImpl.builder()
+ .retrieverContext(opContext.getRetrieverContext().get())
+ .items(List.of(patchAdd2, patchRemoveNonExistent))
+ .build(),
+ false,
+ true);
+
+ // List aspects urns
+ ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1);
+
+ assertEquals(batch.getStart().intValue(), 0);
+ assertEquals(batch.getCount().intValue(), 1);
+ assertEquals(batch.getTotal().intValue(), 1);
+ assertEquals(batch.getEntities().size(), 1);
+ assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());
+
+ EnvelopedAspect envelopedAspect =
+ _entityServiceImpl.getLatestEnvelopedAspect(
+ opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
+ assertEquals(
+ envelopedAspect.getSystemMetadata().getVersion(),
+ "2",
+ "Expected version 3. 1 - Initial, + 1 add, 1 remove");
+ assertEquals(
+ new GlobalTags(envelopedAspect.getValue().data())
+ .getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()),
+ Set.of(tag1, tag2),
+ "Expected both tags");
+ }
+
+ @Test
+ public void testBatchPatchAdd() throws Exception {
+ Urn entityUrn =
+ UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:snowflake,testBatchPatchAdd,PROD)");
+ TagUrn tag1 = TagUrn.createFromString("urn:li:tag:tag1");
+ TagUrn tag2 = TagUrn.createFromString("urn:li:tag:tag2");
+ TagUrn tag3 = TagUrn.createFromString("urn:li:tag:tag3");
+
+ SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();
+
+ ChangeItemImpl initialAspectTag1 =
+ ChangeItemImpl.builder()
+ .urn(entityUrn)
+ .aspectName(GLOBAL_TAGS_ASPECT_NAME)
+ .recordTemplate(
+ new GlobalTags()
+ .setTags(new TagAssociationArray(new TagAssociation().setTag(tag1))))
+ .systemMetadata(systemMetadata.copy())
+ .auditStamp(TEST_AUDIT_STAMP)
+ .build(TestOperationContexts.emptyAspectRetriever(null));
+
+ PatchItemImpl patchAdd3 =
+ PatchItemImpl.builder()
+ .urn(entityUrn)
+ .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
+ .aspectName(GLOBAL_TAGS_ASPECT_NAME)
+ .aspectSpec(
+ _testEntityRegistry
+ .getEntitySpec(DATASET_ENTITY_NAME)
+ .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
+ .patch(
+ GenericJsonPatch.builder()
+ .arrayPrimaryKeys(Map.of("properties", List.of("tag")))
+ .patch(List.of(tagPatchOp(PatchOperationType.ADD, tag3)))
+ .build()
+ .getJsonPatch())
+ .auditStamp(AuditStampUtils.createDefaultAuditStamp())
+ .build(_testEntityRegistry);
+
+ PatchItemImpl patchAdd2 =
+ PatchItemImpl.builder()
+ .urn(entityUrn)
+ .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
+ .aspectName(GLOBAL_TAGS_ASPECT_NAME)
+ .aspectSpec(
+ _testEntityRegistry
+ .getEntitySpec(DATASET_ENTITY_NAME)
+ .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
+ .patch(
+ GenericJsonPatch.builder()
+ .arrayPrimaryKeys(Map.of("properties", List.of("tag")))
+ .patch(List.of(tagPatchOp(PatchOperationType.ADD, tag2)))
+ .build()
+ .getJsonPatch())
+ .auditStamp(AuditStampUtils.createDefaultAuditStamp())
+ .build(_testEntityRegistry);
+
+ PatchItemImpl patchAdd1 =
+ PatchItemImpl.builder()
+ .urn(entityUrn)
+ .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
+ .aspectName(GLOBAL_TAGS_ASPECT_NAME)
+ .aspectSpec(
+ _testEntityRegistry
+ .getEntitySpec(DATASET_ENTITY_NAME)
+ .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
+ .patch(
+ GenericJsonPatch.builder()
+ .arrayPrimaryKeys(Map.of("properties", List.of("tag")))
+ .patch(List.of(tagPatchOp(PatchOperationType.ADD, tag1)))
+ .build()
+ .getJsonPatch())
+ .auditStamp(AuditStampUtils.createDefaultAuditStamp())
+ .build(_testEntityRegistry);
+
+ // establish base entity
+ _entityServiceImpl.ingestAspects(
+ opContext,
+ AspectsBatchImpl.builder()
+ .retrieverContext(opContext.getRetrieverContext().get())
+ .items(List.of(initialAspectTag1))
+ .build(),
+ false,
+ true);
+
+ _entityServiceImpl.ingestAspects(
+ opContext,
+ AspectsBatchImpl.builder()
+ .retrieverContext(opContext.getRetrieverContext().get())
+ .items(List.of(patchAdd3, patchAdd2, patchAdd1))
+ .build(),
+ false,
+ true);
+
+ // List aspects urns
+ ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1);
+
+ assertEquals(batch.getStart().intValue(), 0);
+ assertEquals(batch.getCount().intValue(), 1);
+ assertEquals(batch.getTotal().intValue(), 1);
+ assertEquals(batch.getEntities().size(), 1);
+ assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());
+
+ EnvelopedAspect envelopedAspect =
+ _entityServiceImpl.getLatestEnvelopedAspect(
+ opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
+ assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "3", "Expected version 4");
+ assertEquals(
+ new GlobalTags(envelopedAspect.getValue().data())
+ .getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()),
+ Set.of(tag1, tag2, tag3),
+ "Expected all tags");
+ }
+
+ @Test
+ public void testBatchPatchAddDuplicate() throws Exception {
+ Urn entityUrn =
+ UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:snowflake,testBatchPatchAdd,PROD)");
+ List initialTags =
+ List.of(
+ TagUrn.createFromString("urn:li:tag:__default_large_table"),
+ TagUrn.createFromString("urn:li:tag:__default_low_queries"),
+ TagUrn.createFromString("urn:li:tag:__default_low_changes"),
+ TagUrn.createFromString("urn:li:tag:!10TB+ tables"))
+ .stream()
+ .map(tag -> new TagAssociation().setTag(tag))
+ .collect(Collectors.toList());
+ TagUrn tag2 = TagUrn.createFromString("urn:li:tag:$ 1TB+");
+
+ SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();
+
+ SystemMetadata patchSystemMetadata = new SystemMetadata();
+ patchSystemMetadata.setLastObserved(systemMetadata.getLastObserved() + 1);
+ patchSystemMetadata.setProperties(new StringMap(Map.of(APP_SOURCE, METADATA_TESTS_SOURCE)));
+
+ ChangeItemImpl initialAspectTag1 =
+ ChangeItemImpl.builder()
+ .urn(entityUrn)
+ .aspectName(GLOBAL_TAGS_ASPECT_NAME)
+ .recordTemplate(new GlobalTags().setTags(new TagAssociationArray(initialTags)))
+ .systemMetadata(systemMetadata.copy())
+ .auditStamp(TEST_AUDIT_STAMP)
+ .build(TestOperationContexts.emptyAspectRetriever(null));
+
+ PatchItemImpl patchAdd2 =
+ PatchItemImpl.builder()
+ .urn(entityUrn)
+ .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
+ .aspectName(GLOBAL_TAGS_ASPECT_NAME)
+ .aspectSpec(
+ _testEntityRegistry
+ .getEntitySpec(DATASET_ENTITY_NAME)
+ .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
+ .patch(
+ GenericJsonPatch.builder()
+ .arrayPrimaryKeys(Map.of("properties", List.of("tag")))
+ .patch(List.of(tagPatchOp(PatchOperationType.ADD, tag2)))
+ .build()
+ .getJsonPatch())
+ .systemMetadata(patchSystemMetadata)
+ .auditStamp(AuditStampUtils.createDefaultAuditStamp())
+ .build(_testEntityRegistry);
+
+ // establish base entity
+ _entityServiceImpl.ingestAspects(
+ opContext,
+ AspectsBatchImpl.builder()
+ .retrieverContext(opContext.getRetrieverContext().get())
+ .items(List.of(initialAspectTag1))
+ .build(),
+ false,
+ true);
+
+ _entityServiceImpl.ingestAspects(
+ opContext,
+ AspectsBatchImpl.builder()
+ .retrieverContext(opContext.getRetrieverContext().get())
+ .items(List.of(patchAdd2, patchAdd2)) // duplicate
+ .build(),
+ false,
+ true);
+
+ // List aspects urns
+ ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1);
+
+ assertEquals(batch.getStart().intValue(), 0);
+ assertEquals(batch.getCount().intValue(), 1);
+ assertEquals(batch.getTotal().intValue(), 1);
+ assertEquals(batch.getEntities().size(), 1);
+ assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());
+
+ EnvelopedAspect envelopedAspect =
+ _entityServiceImpl.getLatestEnvelopedAspect(
+ opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
+ assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "2", "Expected version 2");
+ assertEquals(
+ new GlobalTags(envelopedAspect.getValue().data())
+ .getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()),
+ Stream.concat(initialTags.stream().map(TagAssociation::getTag), Stream.of(tag2))
+ .collect(Collectors.toSet()),
+ "Expected all tags");
+ }
+
+ @Test
+ public void testPatchRemoveNonExistent() throws Exception {
+ Urn entityUrn =
+ UrnUtils.getUrn(
+ "urn:li:dataset:(urn:li:dataPlatform:snowflake,testPatchRemoveNonExistent,PROD)");
+ TagUrn tag1 = TagUrn.createFromString("urn:li:tag:tag1");
+
+ PatchItemImpl patchRemove =
+ PatchItemImpl.builder()
+ .urn(entityUrn)
+ .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
+ .aspectName(GLOBAL_TAGS_ASPECT_NAME)
+ .aspectSpec(
+ _testEntityRegistry
+ .getEntitySpec(DATASET_ENTITY_NAME)
+ .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
+ .patch(
+ GenericJsonPatch.builder()
+ .arrayPrimaryKeys(Map.of("properties", List.of("tag")))
+ .patch(List.of(tagPatchOp(PatchOperationType.REMOVE, tag1)))
+ .build()
+ .getJsonPatch())
+ .auditStamp(AuditStampUtils.createDefaultAuditStamp())
+ .build(_testEntityRegistry);
+
+ List results =
+ _entityServiceImpl.ingestAspects(
+ opContext,
+ AspectsBatchImpl.builder()
+ .retrieverContext(opContext.getRetrieverContext().get())
+ .items(List.of(patchRemove))
+ .build(),
+ false,
+ true);
+
+ assertEquals(results.size(), 4, "Expected default aspects + empty globalTags");
+
+ // List aspects urns
+ ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1);
+
+ assertEquals(batch.getStart().intValue(), 0);
+ assertEquals(batch.getCount().intValue(), 1);
+ assertEquals(batch.getTotal().intValue(), 1);
+ assertEquals(batch.getEntities().size(), 1);
+ assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());
+
+ EnvelopedAspect envelopedAspect =
+ _entityServiceImpl.getLatestEnvelopedAspect(
+ opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
+ assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "1", "Expected version 4");
+ assertEquals(
+ new GlobalTags(envelopedAspect.getValue().data())
+ .getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()),
+ Set.of(),
+ "Expected empty tags");
+ }
+
+ @Test
+ public void testPatchAddNonExistent() throws Exception {
+ Urn entityUrn =
+ UrnUtils.getUrn(
+ "urn:li:dataset:(urn:li:dataPlatform:snowflake,testPatchAddNonExistent,PROD)");
+ TagUrn tag1 = TagUrn.createFromString("urn:li:tag:tag1");
+
+ PatchItemImpl patchAdd =
+ PatchItemImpl.builder()
+ .urn(entityUrn)
+ .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
+ .aspectName(GLOBAL_TAGS_ASPECT_NAME)
+ .aspectSpec(
+ _testEntityRegistry
+ .getEntitySpec(DATASET_ENTITY_NAME)
+ .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
+ .patch(
+ GenericJsonPatch.builder()
+ .arrayPrimaryKeys(Map.of("properties", List.of("tag")))
+ .patch(List.of(tagPatchOp(PatchOperationType.ADD, tag1)))
+ .build()
+ .getJsonPatch())
+ .auditStamp(AuditStampUtils.createDefaultAuditStamp())
+ .build(_testEntityRegistry);
+
+ List results =
+ _entityServiceImpl.ingestAspects(
+ opContext,
+ AspectsBatchImpl.builder()
+ .retrieverContext(opContext.getRetrieverContext().get())
+ .items(List.of(patchAdd))
+ .build(),
+ false,
+ true);
+
+ assertEquals(results.size(), 4, "Expected default aspects + globalTags");
+
+ // List aspects urns
+ ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1);
+
+ assertEquals(batch.getStart().intValue(), 0);
+ assertEquals(batch.getCount().intValue(), 1);
+ assertEquals(batch.getTotal().intValue(), 1);
+ assertEquals(batch.getEntities().size(), 1);
+ assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());
+
+ EnvelopedAspect envelopedAspect =
+ _entityServiceImpl.getLatestEnvelopedAspect(
+ opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
+ assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "1", "Expected version 4");
+ assertEquals(
+ new GlobalTags(envelopedAspect.getValue().data())
+ .getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()),
+ Set.of(tag1),
+ "Expected all tags");
+ }
+
@Nonnull
protected com.linkedin.entity.Entity createCorpUserEntity(Urn entityUrn, String email)
throws Exception {
@@ -2210,4 +2696,14 @@ protected Pair getAspectRecor
RecordUtils.toRecordTemplate(clazz, objectMapper.writeValueAsString(aspect));
return new Pair<>(AspectGenerationUtils.getAspectName(aspect), recordTemplate);
}
+
+ private static GenericJsonPatch.PatchOp tagPatchOp(PatchOperationType op, Urn tagUrn) {
+ GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp();
+ patchOp.setOp(op.getValue());
+ patchOp.setPath(String.format("/tags/%s", tagUrn));
+ if (PatchOperationType.ADD.equals(op)) {
+ patchOp.setValue(Map.of("tag", tagUrn.toString()));
+ }
+ return patchOp;
+ }
}
diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/ebean/batch/ChangeItemImplTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/ebean/batch/ChangeItemImplTest.java
new file mode 100644
index 00000000000000..3f6b301e72aa5a
--- /dev/null
+++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/ebean/batch/ChangeItemImplTest.java
@@ -0,0 +1,41 @@
+package com.linkedin.metadata.entity.ebean.batch;
+
+import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;
+import static org.testng.Assert.assertFalse;
+
+import com.linkedin.common.AuditStamp;
+import com.linkedin.common.Status;
+import com.linkedin.common.urn.Urn;
+import com.linkedin.common.urn.UrnUtils;
+import com.linkedin.metadata.AspectGenerationUtils;
+import com.linkedin.mxe.SystemMetadata;
+import io.datahubproject.test.metadata.context.TestOperationContexts;
+import org.testng.annotations.Test;
+
+public class ChangeItemImplTest {
+ private static final AuditStamp TEST_AUDIT_STAMP = AspectGenerationUtils.createAuditStamp();
+
+ @Test
+ public void testBatchDuplicate() throws Exception {
+ Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:batchDuplicateTest");
+ SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();
+ ChangeItemImpl item1 =
+ ChangeItemImpl.builder()
+ .urn(entityUrn)
+ .aspectName(STATUS_ASPECT_NAME)
+ .recordTemplate(new Status().setRemoved(true))
+ .systemMetadata(systemMetadata.copy())
+ .auditStamp(TEST_AUDIT_STAMP)
+ .build(TestOperationContexts.emptyAspectRetriever(null));
+ ChangeItemImpl item2 =
+ ChangeItemImpl.builder()
+ .urn(entityUrn)
+ .aspectName(STATUS_ASPECT_NAME)
+ .recordTemplate(new Status().setRemoved(false))
+ .systemMetadata(systemMetadata.copy())
+ .auditStamp(TEST_AUDIT_STAMP)
+ .build(TestOperationContexts.emptyAspectRetriever(null));
+
+ assertFalse(item1.isDatabaseDuplicateOf(item2));
+ }
+}
diff --git a/metadata-io/src/test/java/com/linkedin/metadata/schemafields/sideeffects/SchemaFieldSideEffectTest.java b/metadata-io/src/test/java/com/linkedin/metadata/schemafields/sideeffects/SchemaFieldSideEffectTest.java
index 6139776702c715..1661f5f02ee593 100644
--- a/metadata-io/src/test/java/com/linkedin/metadata/schemafields/sideeffects/SchemaFieldSideEffectTest.java
+++ b/metadata-io/src/test/java/com/linkedin/metadata/schemafields/sideeffects/SchemaFieldSideEffectTest.java
@@ -151,7 +151,7 @@ public void schemaMetadataToSchemaFieldKeyTest() {
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_id)"))
.aspectName(SCHEMA_FIELD_ALIASES_ASPECT)
- .changeType(changeType)
+ .changeType(ChangeType.UPSERT)
.entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME))
.aspectSpec(
TEST_REGISTRY
@@ -172,7 +172,7 @@ public void schemaMetadataToSchemaFieldKeyTest() {
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_name)"))
.aspectName(SCHEMA_FIELD_ALIASES_ASPECT)
- .changeType(changeType)
+ .changeType(ChangeType.UPSERT)
.entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME))
.aspectSpec(
TEST_REGISTRY
@@ -248,7 +248,7 @@ public void statusToSchemaFieldStatusTest() {
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_id)"))
.aspectName(STATUS_ASPECT_NAME)
- .changeType(changeType)
+ .changeType(ChangeType.UPSERT)
.entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME))
.aspectSpec(
TEST_REGISTRY
@@ -263,7 +263,7 @@ public void statusToSchemaFieldStatusTest() {
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_name)"))
.aspectName(STATUS_ASPECT_NAME)
- .changeType(changeType)
+ .changeType(ChangeType.UPSERT)
.entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME))
.aspectSpec(
TEST_REGISTRY
@@ -324,7 +324,7 @@ public void statusToSchemaFieldStatusTest() {
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_id)"))
.aspectName(STATUS_ASPECT_NAME)
- .changeType(changeType)
+ .changeType(ChangeType.UPSERT)
.entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME))
.aspectSpec(
TEST_REGISTRY
@@ -339,7 +339,7 @@ public void statusToSchemaFieldStatusTest() {
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_name)"))
.aspectName(STATUS_ASPECT_NAME)
- .changeType(changeType)
+ .changeType(ChangeType.UPSERT)
.entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME))
.aspectSpec(
TEST_REGISTRY
@@ -354,7 +354,7 @@ public void statusToSchemaFieldStatusTest() {
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_id)"))
.aspectName(SCHEMA_FIELD_ALIASES_ASPECT)
- .changeType(changeType)
+ .changeType(ChangeType.UPSERT)
.entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME))
.aspectSpec(
TEST_REGISTRY
@@ -375,7 +375,7 @@ public void statusToSchemaFieldStatusTest() {
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_name)"))
.aspectName(SCHEMA_FIELD_ALIASES_ASPECT)
- .changeType(changeType)
+ .changeType(ChangeType.UPSERT)
.entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME))
.aspectSpec(
TEST_REGISTRY
diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml
index 4945b36a251c26..15cd126408a7cc 100644
--- a/metadata-service/configuration/src/main/resources/application.yaml
+++ b/metadata-service/configuration/src/main/resources/application.yaml
@@ -159,7 +159,7 @@ ebean:
autoCreateDdl: ${EBEAN_AUTOCREATE:false}
postgresUseIamAuth: ${EBEAN_POSTGRES_USE_AWS_IAM_AUTH:false}
locking:
- enabled: ${EBEAN_LOCKING_ENABLED:true}
+ enabled: ${EBEAN_LOCKING_ENABLED:false}
durationSeconds: ${EBEAN_LOCKING_DURATION_SECONDS:60}
maximumLocks: ${EBEAN_LOCKING_MAXIMUM_LOCKS:20000}
diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/utils/GenericRecordUtils.java b/metadata-utils/src/main/java/com/linkedin/metadata/utils/GenericRecordUtils.java
index fafca9b1139731..993edc44daeff1 100644
--- a/metadata-utils/src/main/java/com/linkedin/metadata/utils/GenericRecordUtils.java
+++ b/metadata-utils/src/main/java/com/linkedin/metadata/utils/GenericRecordUtils.java
@@ -4,6 +4,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.ByteString;
+import com.linkedin.data.DataMap;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.entity.Aspect;
import com.linkedin.entity.EntityResponse;
@@ -13,6 +14,8 @@
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.mxe.GenericAspect;
import com.linkedin.mxe.GenericPayload;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.stream.Collectors;
@@ -23,6 +26,22 @@ public class GenericRecordUtils {
private GenericRecordUtils() {}
+ public static T copy(T input, Class clazz) {
+ try {
+ if (input == null) {
+ return null;
+ }
+ Constructor constructor = clazz.getConstructor(DataMap.class);
+ return constructor.newInstance(input.data().copy());
+ } catch (CloneNotSupportedException
+ | InvocationTargetException
+ | NoSuchMethodException
+ | InstantiationException
+ | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
/** Deserialize the given value into the aspect based on the input aspectSpec */
@Nonnull
public static RecordTemplate deserializeAspect(
diff --git a/smoke-test/tests/patch/test_datajob_patches.py b/smoke-test/tests/patch/test_datajob_patches.py
index eb129e1e032125..e025c8a2aebeb5 100644
--- a/smoke-test/tests/patch/test_datajob_patches.py
+++ b/smoke-test/tests/patch/test_datajob_patches.py
@@ -1,5 +1,7 @@
+import time
import uuid
+import datahub.metadata.schema_classes as models
from datahub.emitter.mce_builder import make_data_job_urn, make_dataset_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import (
@@ -136,3 +138,95 @@ def test_datajob_inputoutput_dataset_patch(graph_client):
inputoutput_lineage_read.inputDatasetEdges[0].destinationUrn
== other_dataset_urn
)
+
+
+def test_datajob_multiple_inputoutput_dataset_patch(graph_client):
+ """Test creating a data job with multiple input and output datasets and verifying the aspects."""
+ # Create the data job
+ datajob_urn = "urn:li:dataJob:(urn:li:dataFlow:(airflow,training,default),training)"
+
+ # Create input and output dataset URNs
+ input_datasets = ["input_data_1", "input_data_2"]
+ output_datasets = ["output_data_1", "output_data_2"]
+
+ input_dataset_urns = [
+ make_dataset_urn(platform="s3", name=f"test_patch_{dataset}", env="PROD")
+ for dataset in input_datasets
+ ]
+ output_dataset_urns = [
+ make_dataset_urn(platform="s3", name=f"test_patch_{dataset}", env="PROD")
+ for dataset in output_datasets
+ ]
+
+ # Create edges for datasets
+ def make_edge(urn, generate_auditstamp=False):
+ audit_stamp = models.AuditStampClass(
+ time=int(time.time() * 1000.0),
+ actor="urn:li:corpuser:datahub",
+ )
+ return EdgeClass(
+ destinationUrn=str(urn),
+ lastModified=audit_stamp if generate_auditstamp else None,
+ )
+
+ # Initialize empty input/output lineage
+ initial_lineage = DataJobInputOutputClass(
+ inputDatasets=[], outputDatasets=[], inputDatasetEdges=[], outputDatasetEdges=[]
+ )
+
+ # Emit initial lineage
+ mcpw = MetadataChangeProposalWrapper(entityUrn=datajob_urn, aspect=initial_lineage)
+ graph_client.emit_mcp(mcpw)
+
+ # Create patches for input and output datasets
+ patch_builder = DataJobPatchBuilder(datajob_urn)
+ for input_urn in input_dataset_urns:
+ patch_builder.add_input_dataset(make_edge(input_urn))
+ for output_urn in output_dataset_urns:
+ patch_builder.add_output_dataset(make_edge(output_urn))
+
+ # Apply patches
+ for patch_mcp in patch_builder.build():
+ graph_client.emit_mcp(patch_mcp)
+
+ # Verify the lineage was correctly applied
+ lineage_aspect = graph_client.get_aspect(
+ entity_urn=datajob_urn,
+ aspect_type=DataJobInputOutputClass,
+ )
+
+ # Assert lineage was created
+ assert lineage_aspect is not None
+ assert lineage_aspect.inputDatasetEdges is not None
+ assert lineage_aspect.outputDatasetEdges is not None
+
+ # Verify input datasets
+ assert len(lineage_aspect.inputDatasetEdges) == len(input_datasets)
+ input_urns = {edge.destinationUrn for edge in lineage_aspect.inputDatasetEdges}
+ expected_input_urns = {str(urn) for urn in input_dataset_urns}
+ assert input_urns == expected_input_urns
+
+ # Verify output datasets
+ assert len(lineage_aspect.outputDatasetEdges) == len(output_datasets)
+ output_urns = {edge.destinationUrn for edge in lineage_aspect.outputDatasetEdges}
+ expected_output_urns = {str(urn) for urn in output_dataset_urns}
+ assert output_urns == expected_output_urns
+
+ # Test updating the same datasets again (idempotency)
+ patch_builder = DataJobPatchBuilder(datajob_urn)
+ for input_urn in input_dataset_urns:
+ patch_builder.add_input_dataset(make_edge(input_urn))
+ for output_urn in output_dataset_urns:
+ patch_builder.add_output_dataset(make_edge(output_urn))
+
+ for patch_mcp in patch_builder.build():
+ graph_client.emit_mcp(patch_mcp)
+
+ # Verify the aspect hasn't changed
+ updated_lineage_aspect = graph_client.get_aspect(
+ entity_urn=datajob_urn,
+ aspect_type=DataJobInputOutputClass,
+ )
+
+ assert updated_lineage_aspect is not None
+ assert updated_lineage_aspect.to_obj() == lineage_aspect.to_obj()