Skip to content

Commit

Permalink
feat(ingest/datahub): use stream_results with mysql (datahub-project#…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and llance committed Jan 13, 2025
1 parent 393491d commit c927056
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 19 deletions.
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@
"mssql-odbc": sql_common | mssql_common | {"pyodbc"},
"mysql": mysql,
# mariadb should have same dependency as mysql
"mariadb": sql_common | {"pymysql>=1.0.2"},
"mariadb": sql_common | mysql,
"okta": {"okta~=1.7.0", "nest-asyncio"},
"oracle": sql_common | {"oracledb"},
"postgres": sql_common | postgres_common,
Expand Down
10 changes: 10 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/datahub/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from typing import Optional, Set

import pydantic
from pydantic import Field, root_validator

from datahub.configuration.common import AllowDenyPattern
Expand Down Expand Up @@ -119,3 +120,12 @@ def check_ingesting_data(cls, values):
" Please specify at least one of `database_connection` or `kafka_connection`, ideally both."
)
return values

@pydantic.validator("database_connection")
def validate_mysql_scheme(
cls, v: SQLAlchemyConnectionConfig
) -> SQLAlchemyConnectionConfig:
if "mysql" in v.scheme:
if v.scheme != "mysql+pymysql":
raise ValueError("For MySQL, the scheme must be mysql+pymysql.")
return v
Original file line number Diff line number Diff line change
Expand Up @@ -151,31 +151,17 @@ def execute_server_cursor(
self, query: str, params: Dict[str, Any]
) -> Iterable[Dict[str, Any]]:
with self.engine.connect() as conn:
if self.engine.dialect.name == "postgresql":
if self.engine.dialect.name in ["postgresql", "mysql", "mariadb"]:
with conn.begin(): # Transaction required for PostgreSQL server-side cursor
# Note that stream_results=True is mainly supported by PostgreSQL and MySQL-based dialects.
# https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.Connection.execution_options.params.stream_results
conn = conn.execution_options(
stream_results=True,
yield_per=self.config.database_query_batch_size,
)
result = conn.execute(query, params)
for row in result:
yield dict(row)
elif self.engine.dialect.name == "mysql": # MySQL
import MySQLdb

with contextlib.closing(
conn.connection.cursor(MySQLdb.cursors.SSCursor)
) as cursor:
logger.debug(f"Using Cursor type: {cursor.__class__.__name__}")
cursor.execute(query, params)

columns = [desc[0] for desc in cursor.description]
while True:
rows = cursor.fetchmany(self.config.database_query_batch_size)
if not rows:
break # Use break instead of return in generator
for row in rows:
yield dict(zip(columns, row))
else:
raise ValueError(f"Unsupported dialect: {self.engine.dialect.name}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def _get_database_workunits(
self._commit_progress(i)

def _get_kafka_workunits(
self, from_offsets: Dict[int, int], soft_deleted_urns: List[str] = []
self, from_offsets: Dict[int, int], soft_deleted_urns: List[str]
) -> Iterable[MetadataWorkUnit]:
if self.config.kafka_connection is None:
return
Expand Down

0 comments on commit c927056

Please sign in to comment.