Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored May 8, 2024
2 parents c044257 + 9debbdd commit 4fe9ef4
Show file tree
Hide file tree
Showing 37 changed files with 1,706 additions and 357 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public static ExecutionRequest mapExecutionRequest(
inputResult.setArguments(StringMapMapper.map(context, executionRequestInput.getArgs()));
}
inputResult.setRequestedAt(executionRequestInput.getRequestedAt());
if (executionRequestInput.getActorUrn() != null) {
inputResult.setActorUrn(executionRequestInput.getActorUrn().toString());
}
result.setInput(inputResult);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.SetMode;
import com.linkedin.data.template.StringMap;
import com.linkedin.datahub.graphql.QueryContext;
Expand Down Expand Up @@ -113,6 +114,7 @@ public CompletableFuture<String> get(final DataFetchingEnvironment environment)
execInput.setExecutorId(
ingestionSourceInfo.getConfig().getExecutorId(), SetMode.IGNORE_NULL);
execInput.setRequestedAt(System.currentTimeMillis());
execInput.setActorUrn(UrnUtils.getUrn(context.getActorUrn()));

Map<String, String> arguments = new HashMap<>();
String recipe = ingestionSourceInfo.getConfig().getRecipe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static com.linkedin.metadata.Constants.*;

import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.StringMap;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
Expand Down Expand Up @@ -71,6 +72,7 @@ public CompletableFuture<String> get(final DataFetchingEnvironment environment)
execInput.setSource(new ExecutionRequestSource().setType(TEST_CONNECTION_SOURCE_NAME));
execInput.setExecutorId(DEFAULT_EXECUTOR_ID);
execInput.setRequestedAt(System.currentTimeMillis());
execInput.setActorUrn(UrnUtils.getUrn(context.getActorUrn()));

Map<String, String> arguments = new HashMap<>();
arguments.put(
Expand Down
5 changes: 5 additions & 0 deletions datahub-graphql-core/src/main/resources/ingestion.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ type ExecutionRequestInput {
The time at which the request was created
"""
requestedAt: Long!

"""
Urn of the actor who created this execution request
"""
actorUrn: String
}

"""
Expand Down
3 changes: 3 additions & 0 deletions datahub-web-react/src/graphql/ingestion.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ query listIngestionSources($input: ListIngestionSourcesInput!) {
id
input {
requestedAt
actorUrn
}
result {
status
Expand Down Expand Up @@ -76,6 +77,7 @@ query getIngestionSource($urn: String!, $runStart: Int, $runCount: Int) {
id
input {
requestedAt
actorUrn
source {
type
}
Expand All @@ -98,6 +100,7 @@ query getIngestionExecutionRequest($urn: String!) {
source {
type
}
actorUrn
arguments {
key
value
Expand Down
1 change: 1 addition & 0 deletions docker/datahub-gms/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ COPY war.war /datahub/datahub-gms/bin/war.war
COPY metadata-models/src/main/resources/entity-registry.yml /datahub/datahub-gms/resources/entity-registry.yml
COPY docker/datahub-gms/start.sh /datahub/datahub-gms/scripts/start.sh
COPY docker/datahub-gms/jetty.xml /datahub/datahub-gms/scripts/jetty.xml
COPY docker/datahub-gms/jetty-jmx.xml /datahub/datahub-gms/scripts/jetty-jmx.xml
COPY docker/monitoring/client-prometheus-config.yaml /datahub/datahub-gms/scripts/prometheus-config.yaml
RUN chmod +x /datahub/datahub-gms/scripts/start.sh

Expand Down
31 changes: 31 additions & 0 deletions docker/datahub-gms/jetty-jmx.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?xml version="1.0"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "https://www.eclipse.org/jetty/configure_10_0.dtd">

<Configure id="Server" class="org.eclipse.jetty.server.Server">

<!-- =========================================================== -->
<!-- Get the platform MBeanServer -->
<!-- =========================================================== -->
<Call id="MBeanServer" class="java.lang.management.ManagementFactory"
name="getPlatformMBeanServer" />

<!-- =========================================================== -->
<!-- Initialize the Jetty MBeanContainer -->
<!-- =========================================================== -->
<Call name="addBean">
<Arg>
<New id="MBeanContainer" class="org.eclipse.jetty.jmx.MBeanContainer">
<Arg>
<Ref refid="MBeanServer" />
</Arg>
<Call name="beanAdded">
<Arg/>
<Arg>
<Get name="ILoggerFactory" class="org.slf4j.LoggerFactory"/>
</Arg>
</Call>
</New>
</Arg>
</Call>
</Configure>

2 changes: 2 additions & 0 deletions docker/datahub-gms/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ COMMON="
$OTEL_AGENT \
$PROMETHEUS_AGENT \
-jar /jetty-runner.jar \
--stats unsecure \
--jar jetty-util.jar \
--jar jetty-jmx.jar \
--config /datahub/datahub-gms/scripts/jetty.xml \
--config /datahub/datahub-gms/scripts/jetty-jmx.xml \
/datahub/datahub-gms/bin/war.war"

if [[ $SKIP_ELASTICSEARCH_CHECK != true ]]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,8 @@ def run_datajob(
dpi = DataProcessInstance.from_datajob(
datajob=datajob,
id=f"{dag.dag_id}_{ti.task_id}_{dag_run.run_id}",
clone_inlets=True,
clone_outlets=True,
clone_inlets=config is None or config.materialize_iolets,
clone_outlets=config is None or config.materialize_iolets,
)
job_property_bag: Dict[str, str] = {}
job_property_bag["run_id"] = str(dag_run.run_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,14 @@ def on_task_instance_running(

self.emitter.emit(operation_mcp)
logger.debug(f"Emitted Dataset Operation: {outlet}")
else:
if self.graph:
for outlet in datajob.outlets:
if not self.graph.exists(str(outlet)):
logger.warning(f"Dataset {str(outlet)} not materialized")
for inlet in datajob.inlets:
if not self.graph.exists(str(inlet)):
logger.warning(f"Dataset {str(inlet)} not materialized")

def on_task_instance_finish(
self, task_instance: "TaskInstance", status: InstanceRunResult
Expand Down
9 changes: 5 additions & 4 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
sqlglot_lib = {
# Using an Acryl fork of sqlglot.
# https://github.com/tobymao/sqlglot/compare/main...hsheth2:sqlglot:hsheth?expand=1
"acryl-sqlglot==23.11.2.dev2",
"acryl-sqlglot[rs]==23.11.2.dev2",
}

classification_lib = {
Expand Down Expand Up @@ -193,8 +193,7 @@
*sql_common,
# https://github.com/snowflakedb/snowflake-sqlalchemy/issues/350
"snowflake-sqlalchemy>=1.4.3",
# See https://github.com/snowflakedb/snowflake-connector-python/pull/1348 for why 2.8.2 is blocked
"snowflake-connector-python!=2.8.2",
"snowflake-connector-python>=3.4.0",
"pandas",
"cryptography",
"msal",
Expand Down Expand Up @@ -374,7 +373,9 @@
# It's technically wrong for packages to depend on setuptools. However, it seems mlflow does it anyways.
"setuptools",
},
"mode": {"requests", "tenacity>=8.0.1"} | sqllineage_lib | sqlglot_lib,
"mode": {"requests", "python-liquid", "tenacity>=8.0.1"}
| sqllineage_lib
| sqlglot_lib,
"mongodb": {"pymongo[srv]>=3.11", "packaging"},
"mssql": sql_common | mssql_common,
"mssql-odbc": sql_common | mssql_common | {"pyodbc"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class BIContainerSubTypes(str, Enum):
QLIK_APP = "Qlik App"
SIGMA_WORKSPACE = "Sigma Workspace"
SIGMA_WORKBOOK = "Sigma Workbook"
MODE_COLLECTION = "Collection"


class JobContainerSubTypes(str, Enum):
Expand All @@ -64,3 +65,8 @@ class BIAssetSubTypes(str, Enum):
# PowerBI
POWERBI_TILE = "PowerBI Tile"
POWERBI_PAGE = "PowerBI Page"

# Mode
MODE_REPORT = "Report"
MODE_QUERY = "Query"
MODE_CHART = "Chart"
Loading

0 comments on commit 4fe9ef4

Please sign in to comment.