Skip to content

Commit

Permalink
Merge branch 'master' into master+ing-463-fix_unity-analyze-profiling
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored Dec 26, 2023
2 parents d256c9e + ed5bdfc commit 5628e6d
Show file tree
Hide file tree
Showing 15 changed files with 404 additions and 144 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import React from 'react';
import React, { useState } from 'react';
import { Link } from 'react-router-dom';
import styled from 'styled-components/macro';
import { message, Button, List, Typography } from 'antd';
import { LinkOutlined, DeleteOutlined } from '@ant-design/icons';
import { message, Button, List, Typography, Modal, Form, Input } from 'antd';
import { LinkOutlined, DeleteOutlined, EditOutlined } from '@ant-design/icons';
import { EntityType, InstitutionalMemoryMetadata } from '../../../../../../types.generated';
import { useEntityData } from '../../../EntityContext';
import { useEntityData, useMutationUrn } from '../../../EntityContext';
import { useEntityRegistry } from '../../../../../useEntityRegistry';
import { ANTD_GRAY } from '../../../constants';
import { formatDateString } from '../../../containers/profile/utils';
import { useRemoveLinkMutation } from '../../../../../../graphql/mutations.generated';
import { useAddLinkMutation, useRemoveLinkMutation } from '../../../../../../graphql/mutations.generated';
import analytics, { EntityActionType, EventType } from '../../../../../analytics';

const LinkListItem = styled(List.Item)`
border-radius: 5px;
Expand All @@ -33,10 +34,15 @@ type LinkListProps = {
};

export const LinkList = ({ refetch }: LinkListProps) => {
const { urn: entityUrn, entityData } = useEntityData();
const [editModalVisble, setEditModalVisible] = useState(false);
const [linkDetails, setLinkDetails] = useState<InstitutionalMemoryMetadata | undefined>(undefined);
const { urn: entityUrn, entityData, entityType } = useEntityData();
const entityRegistry = useEntityRegistry();
const [removeLinkMutation] = useRemoveLinkMutation();
const links = entityData?.institutionalMemory?.elements || [];
const [form] = Form.useForm();
const [addLinkMutation] = useAddLinkMutation();
const mutationUrn = useMutationUrn();

const handleDeleteLink = async (metadata: InstitutionalMemoryMetadata) => {
try {
Expand All @@ -53,18 +59,113 @@ export const LinkList = ({ refetch }: LinkListProps) => {
refetch?.();
};

const handleEditLink = (metadata: InstitutionalMemoryMetadata) => {
form.setFieldsValue({
url: metadata.url,
label: metadata.description,
});
setLinkDetails(metadata);
setEditModalVisible(true);
};

const handleClose = () => {
form.resetFields();
setEditModalVisible(false);
};

const handleEdit = async (formData: any) => {
if (!linkDetails) return;
try {
await removeLinkMutation({
variables: { input: { linkUrl: linkDetails.url, resourceUrn: linkDetails.associatedUrn || entityUrn } },
});
await addLinkMutation({
variables: { input: { linkUrl: formData.url, label: formData.label, resourceUrn: mutationUrn } },
});

message.success({ content: 'Link Updated', duration: 2 });

analytics.event({
type: EventType.EntityActionEvent,
entityType,
entityUrn: mutationUrn,
actionType: EntityActionType.UpdateLinks,
});

refetch?.();
handleClose();
} catch (e: unknown) {
message.destroy();

if (e instanceof Error) {
message.error({ content: `Error updating link: \n ${e.message || ''}`, duration: 2 });
}
}
};

return entityData ? (
<>
<Modal
title="Edit Link"
visible={editModalVisble}
destroyOnClose
onCancel={handleClose}
footer={[
<Button type="text" onClick={handleClose}>
Cancel
</Button>,
<Button form="editLinkForm" key="submit" htmlType="submit">
Edit
</Button>,
]}
>
<Form form={form} name="editLinkForm" onFinish={handleEdit} layout="vertical">
<Form.Item
name="url"
label="URL"
rules={[
{
required: true,
message: 'A URL is required.',
},
{
type: 'url',
warningOnly: true,
message: 'This field must be a valid url.',
},
]}
>
<Input placeholder="https://" autoFocus />
</Form.Item>
<Form.Item
name="label"
label="Label"
rules={[
{
required: true,
message: 'A label is required.',
},
]}
>
<Input placeholder="A short label for this link" />
</Form.Item>
</Form>
</Modal>
{links.length > 0 && (
<List
size="large"
dataSource={links}
renderItem={(link) => (
<LinkListItem
extra={
<Button onClick={() => handleDeleteLink(link)} type="text" shape="circle" danger>
<DeleteOutlined />
</Button>
<>
<Button onClick={() => handleEditLink(link)} type="text" shape="circle" danger>
<EditOutlined />
</Button>
<Button onClick={() => handleDeleteLink(link)} type="text" shape="circle" danger>
<DeleteOutlined />
</Button>
</>
}
>
<List.Item.Meta
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Button, Form, Switch, Typography } from 'antd';
import { Button, Checkbox, Form, Input, Switch, Typography } from 'antd';
import React, { useMemo, useState } from 'react';
import { Cron } from 'react-js-cron';
import 'react-js-cron/dist/styles.css';
Expand Down Expand Up @@ -31,6 +31,10 @@ const CronText = styled(Typography.Paragraph)`
color: ${ANTD_GRAY[7]};
`;

const AdvancedCheckBox = styled(Typography.Text)`
margin-right: 10px;
margin-bottom: 8px;
`;
const CronSuccessCheck = styled(CheckCircleOutlined)`
color: ${REDESIGN_COLORS.BLUE};
margin-right: 4px;
Expand Down Expand Up @@ -68,8 +72,8 @@ export const CreateScheduleStep = ({ state, updateState, goTo, prev }: StepProps
const { schedule } = state;
const interval = schedule?.interval?.replaceAll(', ', ' ') || DAILY_MIDNIGHT_CRON_INTERVAL;
const timezone = schedule?.timezone || Intl.DateTimeFormat().resolvedOptions().timeZone;

const [scheduleEnabled, setScheduleEnabled] = useState(!!schedule);
const [advancedCronCheck, setAdvancedCronCheck] = useState(false);
const [scheduleCronInterval, setScheduleCronInterval] = useState(interval);
const [scheduleTimezone, setScheduleTimezone] = useState(timezone);

Expand Down Expand Up @@ -137,13 +141,29 @@ export const CreateScheduleStep = ({ state, updateState, goTo, prev }: StepProps
)}
</Form.Item>
<StyledFormItem required label={<Typography.Text strong>Schedule</Typography.Text>}>
<Cron
value={scheduleCronInterval}
setValue={setScheduleCronInterval}
clearButton={false}
className="cron-builder"
leadingZero
/>
<div style={{ paddingBottom: 10, paddingLeft: 10 }}>
<AdvancedCheckBox type="secondary">Advanced</AdvancedCheckBox>
<Checkbox
checked={advancedCronCheck}
onChange={(event) => setAdvancedCronCheck(event.target.checked)}
/>
</div>
{advancedCronCheck ? (
<Input
placeholder={DAILY_MIDNIGHT_CRON_INTERVAL}
autoFocus
value={scheduleCronInterval}
onChange={(e) => setScheduleCronInterval(e.target.value)}
/>
) : (
<Cron
value={scheduleCronInterval}
setValue={setScheduleCronInterval}
clearButton={false}
className="cron-builder"
leadingZero
/>
)}
<CronText>
{cronAsText.error && <>Invalid cron schedule. Cron must be of UNIX form:</>}
{!cronAsText.text && (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ export const SelectTemplateStep = ({ state, updateState, goTo, cancel, ingestion
};

const filteredSources = ingestionSources.filter(
(source) => source.displayName.includes(searchFilter) || source.name.includes(searchFilter),
(source) =>
source.displayName.toLocaleLowerCase().includes(searchFilter.toLocaleLowerCase()) ||
source.name.toLocaleLowerCase().includes(searchFilter.toLocaleLowerCase()),
);

return (
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
sqlglot_lib = {
# Using an Acryl fork of sqlglot.
# https://github.com/tobymao/sqlglot/compare/main...hsheth2:sqlglot:hsheth?expand=1
"acryl-sqlglot==19.0.2.dev10",
"acryl-sqlglot==20.4.1.dev14",
}

sql_common = (
Expand Down
74 changes: 46 additions & 28 deletions metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
UpstreamLineageClass,
)
from datahub.utilities import memory_footprint
from datahub.utilities.dedup_list import deduplicate_list
from datahub.utilities.urns import dataset_urn

logger: logging.Logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -85,6 +86,30 @@ def __post_init__(self):
else:
self.dataset_lineage_type = DatasetLineageTypeClass.TRANSFORMED

def merge_lineage(
self,
upstreams: Set[LineageDataset],
cll: Optional[List[sqlglot_l.ColumnLineageInfo]],
) -> None:
self.upstreams = self.upstreams.union(upstreams)

# Merge CLL using the output column name as the merge key.
self.cll = self.cll or []
existing_cll: Dict[str, sqlglot_l.ColumnLineageInfo] = {
c.downstream.column: c for c in self.cll
}
for c in cll or []:
if c.downstream.column in existing_cll:
# Merge using upstream + column name as the merge key.
existing_cll[c.downstream.column].upstreams = deduplicate_list(
[*existing_cll[c.downstream.column].upstreams, *c.upstreams]
)
else:
# New output column, just add it as is.
self.cll.append(c)

self.cll = self.cll or None


class RedshiftLineageExtractor:
def __init__(
Expand Down Expand Up @@ -161,7 +186,12 @@ def _get_sources_from_query(
)
sources.append(source)

return sources, parsed_result.column_lineage
return (
sources,
parsed_result.column_lineage
if self.config.include_view_column_lineage
else None,
)

def _build_s3_path_from_row(self, filename: str) -> str:
path = filename.strip()
Expand Down Expand Up @@ -208,7 +238,7 @@ def _get_sources(
"Only s3 source supported with copy. The source was: {path}."
)
self.report.num_lineage_dropped_not_support_copy_path += 1
return sources, cll
return [], None
path = strip_s3_prefix(self._get_s3_path(path))
urn = make_dataset_urn_with_platform_instance(
platform=platform.value,
Expand Down Expand Up @@ -284,7 +314,6 @@ def _populate_lineage_map(
ddl=lineage_row.ddl,
filename=lineage_row.filename,
)
target.cll = cll

target.upstreams.update(
self._get_upstream_lineages(
Expand All @@ -294,13 +323,13 @@ def _populate_lineage_map(
raw_db_name=raw_db_name,
)
)
target.cll = cll

# Merging downstreams if dataset already exists and has downstreams
# Merging upstreams if dataset already exists and has upstreams
if target.dataset.urn in self._lineage_map:
self._lineage_map[target.dataset.urn].upstreams = self._lineage_map[
target.dataset.urn
].upstreams.union(target.upstreams)

self._lineage_map[target.dataset.urn].merge_lineage(
upstreams=target.upstreams, cll=target.cll
)
else:
self._lineage_map[target.dataset.urn] = target

Expand Down Expand Up @@ -420,23 +449,21 @@ def populate_lineage(
) -> None:
populate_calls: List[Tuple[str, LineageCollectorType]] = []

if self.config.table_lineage_mode == LineageMode.STL_SCAN_BASED:
if self.config.table_lineage_mode in {
LineageMode.STL_SCAN_BASED,
LineageMode.MIXED,
}:
# Populate table level lineage by getting upstream tables from stl_scan redshift table
query = RedshiftQuery.stl_scan_based_lineage_query(
self.config.database,
self.start_time,
self.end_time,
)
populate_calls.append((query, LineageCollectorType.QUERY_SCAN))
elif self.config.table_lineage_mode == LineageMode.SQL_BASED:
# Populate table level lineage by parsing table creating sqls
query = RedshiftQuery.list_insert_create_queries_sql(
db_name=database,
start_time=self.start_time,
end_time=self.end_time,
)
populate_calls.append((query, LineageCollectorType.QUERY_SQL_PARSER))
elif self.config.table_lineage_mode == LineageMode.MIXED:
if self.config.table_lineage_mode in {
LineageMode.SQL_BASED,
LineageMode.MIXED,
}:
# Populate table level lineage by parsing table creating sqls
query = RedshiftQuery.list_insert_create_queries_sql(
db_name=database,
Expand All @@ -445,15 +472,7 @@ def populate_lineage(
)
populate_calls.append((query, LineageCollectorType.QUERY_SQL_PARSER))

# Populate table level lineage by getting upstream tables from stl_scan redshift table
query = RedshiftQuery.stl_scan_based_lineage_query(
db_name=database,
start_time=self.start_time,
end_time=self.end_time,
)
populate_calls.append((query, LineageCollectorType.QUERY_SCAN))

if self.config.include_views:
if self.config.include_views and self.config.include_view_lineage:
# Populate table level lineage for views
query = RedshiftQuery.view_lineage_query()
populate_calls.append((query, LineageCollectorType.VIEW))
Expand Down Expand Up @@ -540,7 +559,6 @@ def get_lineage(
dataset_urn: str,
schema: RedshiftSchema,
) -> Optional[Tuple[UpstreamLineageClass, Dict[str, str]]]:

upstream_lineage: List[UpstreamClass] = []

cll_lineage: List[FineGrainedLineage] = []
Expand Down
Loading

0 comments on commit 5628e6d

Please sign in to comment.