Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Jun 28, 2024
2 parents fd3c4ef + 62e6b7f commit 07bef8d
Show file tree
Hide file tree
Showing 11 changed files with 628 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.function.Function;
Expand Down Expand Up @@ -48,17 +49,72 @@ public static PluginFactory merge(
PluginFactory b,
@Nullable
BiFunction<PluginConfiguration, List<ClassLoader>, PluginFactory> pluginFactoryProvider) {

if (b.isEmpty()) {
return a;
}
if (a.isEmpty()) {
return b;
}

PluginConfiguration mergedPluginConfig =
PluginConfiguration.merge(a.pluginConfiguration, b.pluginConfiguration);
List<ClassLoader> mergedClassLoaders =
Stream.concat(a.getClassLoaders().stream(), b.getClassLoaders().stream())
.collect(Collectors.toList());

if (pluginFactoryProvider != null) {
return pluginFactoryProvider.apply(mergedPluginConfig, mergedClassLoaders);
} else {
return PluginFactory.withCustomClasspath(mergedPluginConfig, mergedClassLoaders);
if (!a.hasLoadedPlugins() && !b.hasLoadedPlugins()) {
if (pluginFactoryProvider != null) {
return pluginFactoryProvider.apply(mergedPluginConfig, mergedClassLoaders);
} else {
if (mergedPluginConfig
.streamAll()
.anyMatch(config -> config.getSpring() != null && config.getSpring().isEnabled())) {
throw new IllegalStateException(
"Unexpected Spring configuration found without a provided Spring Plugin Factory");
}
return PluginFactory.withCustomClasspath(mergedPluginConfig, mergedClassLoaders);
}
}

PluginFactory loadedA = a.hasLoadedPlugins() ? a : a.loadPlugins();
PluginFactory loadedB = b.hasLoadedPlugins() ? b : b.loadPlugins();

return new PluginFactory(
mergedPluginConfig,
mergedClassLoaders,
Stream.concat(
loadedA.aspectPayloadValidators.stream()
.filter(
aPlugin ->
loadedB.pluginConfiguration.getAspectPayloadValidators().stream()
.noneMatch(bConfig -> aPlugin.getConfig().isDisabledBy(bConfig))),
loadedB.aspectPayloadValidators.stream())
.collect(Collectors.toList()),
Stream.concat(
loadedA.mutationHooks.stream()
.filter(
aPlugin ->
loadedB.pluginConfiguration.getMutationHooks().stream()
.noneMatch(bConfig -> aPlugin.getConfig().isDisabledBy(bConfig))),
loadedB.mutationHooks.stream())
.collect(Collectors.toList()),
Stream.concat(
loadedA.mclSideEffects.stream()
.filter(
aPlugin ->
loadedB.pluginConfiguration.getMclSideEffects().stream()
.noneMatch(bConfig -> aPlugin.getConfig().isDisabledBy(bConfig))),
loadedB.mclSideEffects.stream())
.collect(Collectors.toList()),
Stream.concat(
loadedA.mcpSideEffects.stream()
.filter(
aPlugin ->
loadedB.pluginConfiguration.getMcpSideEffects().stream()
.noneMatch(bConfig -> aPlugin.getConfig().isDisabledBy(bConfig))),
loadedB.mcpSideEffects.stream())
.collect(Collectors.toList()));
}

@Getter private final PluginConfiguration pluginConfiguration;
Expand All @@ -77,22 +133,62 @@ public PluginFactory(
pluginConfiguration == null ? PluginConfiguration.EMPTY : pluginConfiguration;
}

public PluginFactory(
@Nullable PluginConfiguration pluginConfiguration,
@Nonnull List<ClassLoader> classLoaders,
@Nonnull List<AspectPayloadValidator> aspectPayloadValidators,
@Nonnull List<MutationHook> mutationHooks,
@Nonnull List<MCLSideEffect> mclSideEffects,
@Nonnull List<MCPSideEffect> mcpSideEffects) {
this.classLoaders = classLoaders;
this.pluginConfiguration =
pluginConfiguration == null ? PluginConfiguration.EMPTY : pluginConfiguration;
this.aspectPayloadValidators = applyDisable(aspectPayloadValidators);
this.mutationHooks = applyDisable(mutationHooks);
this.mclSideEffects = applyDisable(mclSideEffects);
this.mcpSideEffects = applyDisable(mcpSideEffects);
}

public PluginFactory loadPlugins() {
this.aspectPayloadValidators = buildAspectPayloadValidators(this.pluginConfiguration);
this.mutationHooks = buildMutationHooks(this.pluginConfiguration);
this.mclSideEffects = buildMCLSideEffects(this.pluginConfiguration);
this.mcpSideEffects = buildMCPSideEffects(this.pluginConfiguration);
logSummary(
Stream.of(
this.aspectPayloadValidators,
this.mutationHooks,
this.mclSideEffects,
this.mcpSideEffects)
.flatMap(List::stream)
.collect(Collectors.toList()));
if (this.aspectPayloadValidators != null
|| this.mutationHooks != null
|| this.mclSideEffects != null
|| this.mcpSideEffects != null) {
log.error("Plugins are already loaded. Re-building plugins will be skipped.");
} else {
this.aspectPayloadValidators = buildAspectPayloadValidators(this.pluginConfiguration);
this.mutationHooks = buildMutationHooks(this.pluginConfiguration);
this.mclSideEffects = buildMCLSideEffects(this.pluginConfiguration);
this.mcpSideEffects = buildMCPSideEffects(this.pluginConfiguration);
logSummary(
Stream.of(
this.aspectPayloadValidators,
this.mutationHooks,
this.mclSideEffects,
this.mcpSideEffects)
.flatMap(List::stream)
.collect(Collectors.toList()));
}
return this;
}

public boolean isEmpty() {
return this.pluginConfiguration.isEmpty()
&& Optional.ofNullable(this.aspectPayloadValidators).map(List::isEmpty).orElse(true)
&& Optional.ofNullable(this.mutationHooks).map(List::isEmpty).orElse(true)
&& Optional.ofNullable(this.mclSideEffects).map(List::isEmpty).orElse(true)
&& Optional.ofNullable(this.mcpSideEffects).map(List::isEmpty).orElse(true);
}

public boolean hasLoadedPlugins() {
return Stream.of(
this.aspectPayloadValidators,
this.mutationHooks,
this.mcpSideEffects,
this.mcpSideEffects)
.anyMatch(Objects::nonNull);
}

private void logSummary(List<PluginSpec> pluginSpecs) {
if (!pluginSpecs.isEmpty()) {
log.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ public class PluginConfiguration {

public static PluginConfiguration EMPTY = new PluginConfiguration();

public boolean isEmpty() {
return aspectPayloadValidators.isEmpty()
&& mutationHooks.isEmpty()
&& mclSideEffects.isEmpty()
&& mcpSideEffects.isEmpty();
}

public static PluginConfiguration merge(PluginConfiguration a, PluginConfiguration b) {
return new PluginConfiguration(
Stream.concat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,60 @@ public void tripleMergeWithDisabled() throws EntityRegistryException {
.count(),
0);
}

@Test
public void testEmptyMerges() throws EntityRegistryException {
ConfigEntityRegistry configEntityRegistry1 =
new ConfigEntityRegistry(
TestEntityProfile.class.getClassLoader().getResourceAsStream(REGISTRY_FILE_1));
ConfigEntityRegistry emptyEntityRegistry =
new ConfigEntityRegistry(
TestEntityProfile.class.getClassLoader().getResourceAsStream(REGISTRY_FILE_2),
(config, classLoaders) -> PluginFactory.empty());

MergedEntityRegistry mergedEntityRegistry = new MergedEntityRegistry(configEntityRegistry1);
mergedEntityRegistry.apply(emptyEntityRegistry);
assertEquals(mergedEntityRegistry.getPluginFactory(), configEntityRegistry1.getPluginFactory());

MergedEntityRegistry mergedEntityRegistry2 = new MergedEntityRegistry(emptyEntityRegistry);
mergedEntityRegistry2.apply(configEntityRegistry1);
assertEquals(
mergedEntityRegistry2.getPluginFactory(), configEntityRegistry1.getPluginFactory());
}

@Test
public void testUnloadedMerge() throws EntityRegistryException {
ConfigEntityRegistry configEntityRegistry1 =
new ConfigEntityRegistry(
TestEntityProfile.class.getClassLoader().getResourceAsStream(REGISTRY_FILE_1),
(config, classLoaders) -> new PluginFactory(config, classLoaders));
ConfigEntityRegistry configEntityRegistry2 =
new ConfigEntityRegistry(
TestEntityProfile.class.getClassLoader().getResourceAsStream(REGISTRY_FILE_2),
(config, classLoaders) -> new PluginFactory(config, classLoaders));

MergedEntityRegistry mergedEntityRegistry = new MergedEntityRegistry(configEntityRegistry1);
mergedEntityRegistry.apply(configEntityRegistry2);

assertEquals(
mergedEntityRegistry.getAllAspectPayloadValidators().stream()
.filter(p -> p.getConfig().getSupportedOperations().contains("DELETE"))
.count(),
1);
assertEquals(
mergedEntityRegistry.getAllMutationHooks().stream()
.filter(p -> p.getConfig().getSupportedOperations().contains("DELETE"))
.count(),
1);
assertEquals(
mergedEntityRegistry.getAllMCLSideEffects().stream()
.filter(p -> p.getConfig().getSupportedOperations().contains("DELETE"))
.count(),
1);
assertEquals(
mergedEntityRegistry.getAllMCPSideEffects().stream()
.filter(p -> p.getConfig().getSupportedOperations().contains("DELETE"))
.count(),
1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,21 @@
SNOWFLAKE_COST_TABLE = "costs"
SNOWFLAKE_PROCESSED_TABLE = "processed_costs"


def _fake_snowflake_execute(*args, **kwargs):
raise ValueError("mocked snowflake execute to not run queries")


with DAG(
"snowflake_operator",
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
) as dag:
# HACK: We don't want to send real requests to Snowflake. As a workaround,
# we can simply monkey-patch the operator.
SnowflakeOperator.execute = _fake_snowflake_execute # type: ignore

transform_cost_table = SnowflakeOperator(
snowflake_conn_id="my_snowflake",
task_id="transform_cost_table",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,12 +685,7 @@ def table_upstreams_with_column_lineage(
t.query_start_time AS query_start_time,
t.query_id AS query_id
FROM
(
SELECT * from snowflake.account_usage.access_history
WHERE
query_start_time >= to_timestamp_ltz({start_time_millis}, 3)
AND query_start_time < to_timestamp_ltz({end_time_millis}, 3)
) t,
snowflake.account_usage.access_history t,
lateral flatten(input => t.DIRECT_OBJECTS_ACCESSED) r,
lateral flatten(input => t.OBJECTS_MODIFIED) w,
lateral flatten(input => w.value : "columns", outer => true) wcols,
Expand Down Expand Up @@ -780,12 +775,14 @@ def table_upstreams_with_column_lineage(
queries AS (
select qid.downstream_table_name, qid.query_id, query_history.query_text, query_history.start_time
from query_ids qid
JOIN (
LEFT JOIN (
SELECT * FROM snowflake.account_usage.query_history
WHERE query_history.start_time >= to_timestamp_ltz({start_time_millis}, 3)
AND query_history.start_time < to_timestamp_ltz({end_time_millis}, 3)
) query_history
on qid.query_id = query_history.query_id
WHERE qid.query_id is not null
AND query_history.query_text is not null
)
SELECT
h.downstream_table_name AS "DOWNSTREAM_TABLE_NAME",
Expand Down Expand Up @@ -850,12 +847,7 @@ def table_upstreams_only(
t.query_start_time AS query_start_time,
t.query_id AS query_id
FROM
(
SELECT * from snowflake.account_usage.access_history
WHERE
query_start_time >= to_timestamp_ltz({start_time_millis}, 3)
AND query_start_time < to_timestamp_ltz({end_time_millis}, 3)
) t,
snowflake.account_usage.access_history t,
lateral flatten(input => t.DIRECT_OBJECTS_ACCESSED) r,
lateral flatten(input => t.OBJECTS_MODIFIED) w
WHERE
Expand Down
8 changes: 6 additions & 2 deletions metadata-service/plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@ dependencies {
implementation externalDependency.jacksonDataFormatYaml
implementation externalDependency.jacksonJDK8
implementation externalDependency.jacksonDataPropertyFormat
implementation externalDependency.logbackClassic;
implementation externalDependency.logbackClassic
implementation externalDependency.slf4jApi

compileOnly externalDependency.lombok
annotationProcessor externalDependency.lombok

testImplementation project(':test-models')
testImplementation project(path: ':test-models', configuration: 'testDataTemplate')
testImplementation externalDependency.mockito
testImplementation externalDependency.testng
annotationProcessor externalDependency.lombok
testCompileOnly externalDependency.lombok
testAnnotationProcessor externalDependency.lombok
}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,27 @@ protected <T extends PluginSpec> List<T> build(
try {
Class<?> clazz = classLoader.loadClass(config.getClassName());

final T plugin;
final List<T> plugins;
if (config.getSpring().getName() == null) {
plugin = (T) springApplicationContext.getBean(clazz);
plugins =
springApplicationContext.getBeansOfType(clazz).values().stream()
.map(plugin -> (T) plugin)
.collect(Collectors.toList());
} else {
plugin = (T) springApplicationContext.getBean(config.getSpring().getName(), clazz);
plugins =
List.of((T) springApplicationContext.getBean(config.getSpring().getName(), clazz));
}

if (plugin.enabled()) {
result.add((T) plugin.setConfig(config));
}
plugins.stream()
.filter(plugin -> plugin.enabled())
.forEach(
plugin -> {
if (plugin.getConfig() != null) {
result.add(plugin);
} else {
result.add((T) plugin.setConfig(config));
}
});

loaded = true;
break;
Expand Down
Loading

0 comments on commit 07bef8d

Please sign in to comment.