Skip to content

Commit

Permalink
[FLINK-33370][connectors/elasticsearch] Simplify validateAndParseHost…
Browse files Browse the repository at this point in the history
…sString in Elasticsearch connector's configuration
  • Loading branch information
TanYuxin-tyx committed Nov 10, 2023
1 parent 161b615 commit eeec60d
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.elasticsearch.sink.FlushBackoffType;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchCommonUtils;

import org.apache.http.HttpHost;

Expand Down Expand Up @@ -122,37 +122,11 @@ public Optional<Duration> getSocketTimeout() {

public List<HttpHost> getHosts() {
return config.get(HOSTS_OPTION).stream()
.map(ElasticsearchConfiguration::validateAndParseHostsString)
.map(ElasticsearchCommonUtils::validateAndParseHostsString)
.collect(Collectors.toList());
}

public Optional<Integer> getParallelism() {
return config.getOptional(SINK_PARALLELISM);
}

private static HttpHost validateAndParseHostsString(String host) {
try {
HttpHost httpHost = HttpHost.create(host);
if (httpHost.getPort() < 0) {
throw new ValidationException(
String.format(
"Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing port.",
host, HOSTS_OPTION.key()));
}

if (httpHost.getSchemeName() == null) {
throw new ValidationException(
String.format(
"Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing scheme.",
host, HOSTS_OPTION.key()));
}
return httpHost;
} catch (Exception e) {
throw new ValidationException(
String.format(
"Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'.",
host, HOSTS_OPTION.key()),
e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.elasticsearch.util;

import org.apache.flink.table.api.ValidationException;

import org.apache.http.HttpHost;

import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION;

/** Common utilities for Elasticsearch connector, e.g. the configuration utils, etc. */
public class ElasticsearchCommonUtils {

/**
* Parse Hosts String to list.
*
* <p>Hosts String format was given as following:
*
* <pre>
* connector.hosts = http://host_name:9092;http://host_name:9093
* </pre>
*/
public static HttpHost validateAndParseHostsString(String host) {
try {
HttpHost httpHost = HttpHost.create(host);
if (httpHost.getPort() < 0) {
throw new ValidationException(
String.format(
"Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing port.",
host, HOSTS_OPTION.key()));
}

if (httpHost.getSchemeName() == null) {
throw new ValidationException(
String.format(
"Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing scheme.",
host, HOSTS_OPTION.key()));
}
return httpHost;
} catch (Exception e) {
throw new ValidationException(
String.format(
"Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'.",
host, HOSTS_OPTION.key()),
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchCommonUtils;

import org.apache.http.HttpHost;

Expand All @@ -38,42 +38,7 @@ final class Elasticsearch6Configuration extends ElasticsearchConfiguration {

public List<HttpHost> getHosts() {
return config.get(HOSTS_OPTION).stream()
.map(Elasticsearch6Configuration::validateAndParseHostsString)
.map(ElasticsearchCommonUtils::validateAndParseHostsString)
.collect(Collectors.toList());
}

/**
* Parse Hosts String to list.
*
* <p>Hosts String format was given as following:
*
* <pre>
* connector.hosts = http://host_name:9092;http://host_name:9093
* </pre>
*/
private static HttpHost validateAndParseHostsString(String host) {
try {
HttpHost httpHost = HttpHost.create(host);
if (httpHost.getPort() < 0) {
throw new ValidationException(
String.format(
"Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing port.",
host, HOSTS_OPTION.key()));
}

if (httpHost.getSchemeName() == null) {
throw new ValidationException(
String.format(
"Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing scheme.",
host, HOSTS_OPTION.key()));
}
return httpHost;
} catch (Exception e) {
throw new ValidationException(
String.format(
"Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'.",
host, HOSTS_OPTION.key()),
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchCommonUtils;

import org.apache.http.HttpHost;

Expand All @@ -38,33 +38,7 @@ final class Elasticsearch7Configuration extends ElasticsearchConfiguration {

public List<HttpHost> getHosts() {
return config.get(HOSTS_OPTION).stream()
.map(Elasticsearch7Configuration::validateAndParseHostsString)
.map(ElasticsearchCommonUtils::validateAndParseHostsString)
.collect(Collectors.toList());
}

private static HttpHost validateAndParseHostsString(String host) {
try {
HttpHost httpHost = HttpHost.create(host);
if (httpHost.getPort() < 0) {
throw new ValidationException(
String.format(
"Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing port.",
host, HOSTS_OPTION.key()));
}

if (httpHost.getSchemeName() == null) {
throw new ValidationException(
String.format(
"Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing scheme.",
host, HOSTS_OPTION.key()));
}
return httpHost;
} catch (Exception e) {
throw new ValidationException(
String.format(
"Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'.",
host, HOSTS_OPTION.key()),
e);
}
}
}

0 comments on commit eeec60d

Please sign in to comment.