diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka_connect.py b/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka_connect.py index 0b201278142e3a..48b65a0ef24181 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka_connect.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka_connect.py @@ -315,9 +315,9 @@ def get_parser( transforms.append(transform) for key in self.connector_manifest.config.keys(): if key.startswith(f"transforms.{name}."): - transform[ - key.replace(f"transforms.{name}.", "") - ] = self.connector_manifest.config[key] + transform[key.replace(f"transforms.{name}.", "")] = ( + self.connector_manifest.config[key] + ) return self.JdbcParser( db_connection_url, @@ -428,9 +428,9 @@ def _extract_lineages(self): self.connector_manifest.flow_property_bag = self.connector_manifest.config # Mask/Remove properties that may reveal credentials - self.connector_manifest.flow_property_bag[ - "connection.url" - ] = parser.db_connection_url + self.connector_manifest.flow_property_bag["connection.url"] = ( + parser.db_connection_url + ) if "connection.password" in self.connector_manifest.flow_property_bag: del self.connector_manifest.flow_property_bag["connection.password"] if "connection.user" in self.connector_manifest.flow_property_bag: @@ -804,9 +804,9 @@ def get_parser( transforms.append(transform) for key in self.connector_manifest.config.keys(): if key.startswith(f"transforms.{name}."): - transform[ - key.replace(f"transforms.{name}.", "") - ] = self.connector_manifest.config[key] + transform[key.replace(f"transforms.{name}.", "")] = ( + self.connector_manifest.config[key] + ) if "defaultDataset" in connector_manifest.config: defaultDataset = connector_manifest.config["defaultDataset"] @@ -1155,7 +1155,7 @@ def __init__(self, config: KafkaConnectSourceConfig, ctx: PipelineContext): ) self.session.auth = (self.config.username, self.config.password) - test_response = self.session.get(f"{self.config.connect_uri}") + test_response = self.session.get(f"{self.config.connect_uri}/connectors") test_response.raise_for_status() logger.info(f"Connection to {self.config.connect_uri} is ok") if not jpype.isJVMStarted():