-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(ingestion/oracle): Improved foreign key handling #11867
base: master
Are you sure you want to change the base?
Changes from 13 commits
9cc624f
b7e85bf
daa506b
f4a2e01
baae9a2
59c9b05
630a889
9f49414
d8fb4c8
17c01b4
f82529f
db56578
d385e95
f48ea6b
bec592d
e91e5cb
1fdeca3
f2ad6a1
5e2539e
fcae9f3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -374,81 +374,90 @@ def _get_constraint_data( | |
) -> List[sqlalchemy.engine.Row]: | ||
params = {"table_name": table_name} | ||
|
||
# Simplified query that's more reliable with SQLAlchemy 1.4 | ||
text = ( | ||
"SELECT" | ||
"\nac.constraint_name," # 0 | ||
"\nac.constraint_type," # 1 | ||
"\nloc.column_name AS local_column," # 2 | ||
"\nrem.table_name AS remote_table," # 3 | ||
"\nrem.column_name AS remote_column," # 4 | ||
"\nrem.owner AS remote_owner," # 5 | ||
"\nloc.position as loc_pos," # 6 | ||
"\nrem.position as rem_pos," # 7 | ||
"\nac.search_condition," # 8 | ||
"\nac.delete_rule" # 9 | ||
"\nFROM dba_constraints%(dblink)s ac," | ||
"\ndba_cons_columns%(dblink)s loc," | ||
"\ndba_cons_columns%(dblink)s rem" | ||
"\nWHERE ac.table_name = CAST(:table_name AS VARCHAR2(128))" | ||
"\nAND ac.constraint_type IN ('R','P', 'U', 'C')" | ||
"\nac.constraint_name," | ||
"\nac.constraint_type," | ||
"\nacc.column_name AS local_column," | ||
"\nNULL AS remote_table," | ||
"\nNULL AS remote_column," | ||
"\nNULL AS remote_owner," | ||
"\nacc.position AS loc_pos," | ||
"\nNULL AS rem_pos," | ||
"\nac.search_condition," | ||
"\nac.delete_rule" | ||
"\nFROM all_constraints ac" | ||
"\nJOIN all_cons_columns acc" | ||
"\nON ac.owner = acc.owner" | ||
"\nAND ac.constraint_name = acc.constraint_name" | ||
"\nAND ac.table_name = acc.table_name" | ||
"\nWHERE ac.table_name = :table_name" | ||
"\nAND ac.constraint_type IN ('P', 'U', 'C')" | ||
) | ||
|
||
if schema is not None: | ||
params["owner"] = schema | ||
text += "\nAND ac.owner = CAST(:owner AS VARCHAR2(128))" | ||
text += "\nAND ac.owner = :owner" | ||
|
||
# For foreign keys, join with the remote columns | ||
text += ( | ||
"\nAND ac.owner = loc.owner" | ||
"\nAND ac.constraint_name = loc.constraint_name" | ||
"\nAND ac.r_owner = rem.owner(+)" | ||
"\nAND ac.r_constraint_name = rem.constraint_name(+)" | ||
"\nAND (rem.position IS NULL or loc.position=rem.position)" | ||
"\nORDER BY ac.constraint_name, loc.position" | ||
"\nUNION ALL" | ||
"\nSELECT" | ||
"\nac.constraint_name," | ||
"\nac.constraint_type," | ||
"\nacc.column_name AS local_column," | ||
"\nac.r_table_name AS remote_table," | ||
"\nrcc.column_name AS remote_column," | ||
"\nac.r_owner AS remote_owner," | ||
"\nacc.position AS loc_pos," | ||
"\nrcc.position AS rem_pos," | ||
"\nac.search_condition," | ||
"\nac.delete_rule" | ||
"\nFROM all_constraints ac" | ||
"\nJOIN all_cons_columns acc" | ||
"\nON ac.owner = acc.owner" | ||
"\nAND ac.constraint_name = acc.constraint_name" | ||
"\nAND ac.table_name = acc.table_name" | ||
"\nLEFT JOIN all_cons_columns rcc" | ||
"\nON ac.r_owner = rcc.owner" | ||
"\nAND ac.r_constraint_name = rcc.constraint_name" | ||
"\nAND acc.position = rcc.position" | ||
"\nWHERE ac.table_name = :table_name" | ||
"\nAND ac.constraint_type = 'R'" | ||
) | ||
|
||
text = text % {"dblink": dblink} | ||
if schema is not None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
text += "\nAND ac.owner = :owner" | ||
|
||
text += "\nORDER BY constraint_name, loc_pos" | ||
|
||
rp = self._inspector_instance.bind.execute(sql.text(text), params) | ||
constraint_data = rp.fetchall() | ||
return constraint_data | ||
return list(rp.fetchall()) | ||
|
||
def get_pk_constraint( | ||
self, table_name: str, schema: Optional[str] = None, dblink: str = "" | ||
) -> Dict: | ||
denormalized_table_name = self._inspector_instance.dialect.denormalize_name( | ||
table_name | ||
) | ||
assert denormalized_table_name | ||
|
||
schema = self._inspector_instance.dialect.denormalize_name( | ||
schema or self.default_schema_name | ||
) | ||
|
||
if schema is None: | ||
schema = self._inspector_instance.dialect.default_schema_name | ||
|
||
pkeys = [] | ||
constraint_name = None | ||
constraint_data = self._get_constraint_data( | ||
denormalized_table_name, schema, dblink | ||
) | ||
|
||
for row in constraint_data: | ||
( | ||
cons_name, | ||
cons_type, | ||
local_column, | ||
remote_table, | ||
remote_column, | ||
remote_owner, | ||
) = row[0:2] + tuple( | ||
[self._inspector_instance.dialect.normalize_name(x) for x in row[2:6]] | ||
try: | ||
for row in self._get_constraint_data(table_name, schema, dblink): | ||
if row[1] == "P": # constraint_type is 'P' for primary key | ||
if constraint_name is None: | ||
constraint_name = ( | ||
self._inspector_instance.dialect.normalize_name(row[0]) | ||
) | ||
col_name = self._inspector_instance.dialect.normalize_name( | ||
row[2] | ||
) # local_column | ||
pkeys.append(col_name) | ||
except Exception as e: | ||
logger.error( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be using our structured reporting - see https://acryldata.notion.site/Error-reporting-in-ingestion-5343cc6ea0c84633b38070d1a409c569?pvs=4 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as a reporter hasn't been implemented at all in the Oracle connector I imagine that we should do that, right? There are a number of other areas that are currently logged but not reported. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The oracle source uses our SQL common source internally, which does do this reporting in most places. Overall, I'd like to opportunistically move towards structured logging whenever we make tweaks to connectors There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reporter's errors/warnings have now been added. |
||
f"Error processing PK constraint data for {schema}.{table_name}: {str(e)}" | ||
) | ||
if cons_type == "P": | ||
if constraint_name is None: | ||
constraint_name = self._inspector_instance.dialect.normalize_name( | ||
cons_name | ||
) | ||
pkeys.append(local_column) | ||
# Return empty constraint if we can't process it | ||
return {"constrained_columns": [], "name": None} | ||
|
||
return {"constrained_columns": pkeys, "name": constraint_name} | ||
|
||
|
@@ -552,8 +561,8 @@ def get_view_definition( | |
text = "SELECT text FROM dba_views WHERE view_name=:view_name" | ||
|
||
if schema is not None: | ||
text += " AND owner = :schema" | ||
params["schema"] = schema | ||
params["owner"] = schema | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is schema set as an owner? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In Oracle's DBA_VIEWS, the OWNER column represents the schema name that owns the view. |
||
text += "\nAND ac.owner = CAST(:owner AS VARCHAR2(128))" | ||
|
||
rp = self._inspector_instance.bind.execute(sql.text(text), params).scalar() | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you set these to NULL?
Aren't these NULL anyway if we omit to select
R
constraints? (if I guess right andR
means remote tables`.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is true, but this does end up being a little more efficient splitting this out into 2 UNION'ed queries on the Oracle side.