Skip to content

Commit

Permalink
fix(ingest/kafka-connect): update connection test url
Browse files Browse the repository at this point in the history
When using confluent cloud, GET does work on root but works on
/connectors endpoint
  • Loading branch information
mayurinehate committed Dec 10, 2024
1 parent 0a2ac70 commit c01cb59
Showing 1 changed file with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,9 @@ def get_parser(
transforms.append(transform)
for key in self.connector_manifest.config.keys():
if key.startswith(f"transforms.{name}."):
transform[
key.replace(f"transforms.{name}.", "")
] = self.connector_manifest.config[key]
transform[key.replace(f"transforms.{name}.", "")] = (
self.connector_manifest.config[key]
)

return self.JdbcParser(
db_connection_url,
Expand Down Expand Up @@ -428,9 +428,9 @@ def _extract_lineages(self):
self.connector_manifest.flow_property_bag = self.connector_manifest.config

# Mask/Remove properties that may reveal credentials
self.connector_manifest.flow_property_bag[
"connection.url"
] = parser.db_connection_url
self.connector_manifest.flow_property_bag["connection.url"] = (
parser.db_connection_url
)
if "connection.password" in self.connector_manifest.flow_property_bag:
del self.connector_manifest.flow_property_bag["connection.password"]
if "connection.user" in self.connector_manifest.flow_property_bag:
Expand Down Expand Up @@ -804,9 +804,9 @@ def get_parser(
transforms.append(transform)
for key in self.connector_manifest.config.keys():
if key.startswith(f"transforms.{name}."):
transform[
key.replace(f"transforms.{name}.", "")
] = self.connector_manifest.config[key]
transform[key.replace(f"transforms.{name}.", "")] = (
self.connector_manifest.config[key]
)

if "defaultDataset" in connector_manifest.config:
defaultDataset = connector_manifest.config["defaultDataset"]
Expand Down Expand Up @@ -1155,7 +1155,7 @@ def __init__(self, config: KafkaConnectSourceConfig, ctx: PipelineContext):
)
self.session.auth = (self.config.username, self.config.password)

test_response = self.session.get(f"{self.config.connect_uri}")
test_response = self.session.get(f"{self.config.connect_uri}/connectors")
test_response.raise_for_status()
logger.info(f"Connection to {self.config.connect_uri} is ok")
if not jpype.isJVMStarted():
Expand Down

0 comments on commit c01cb59

Please sign in to comment.