Skip to content

Commit

Permalink
elasticsearch flusher authentication fix tls config and http config (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
liangry authored and yyuuttaaoo committed Nov 8, 2023
1 parent 91a74ae commit 8b634df
Showing 1 changed file with 26 additions and 61 deletions.
87 changes: 26 additions & 61 deletions plugins/flusher/elasticsearch/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"fmt"
"net/http"
"os"
"reflect"
"time"

"github.com/elastic/go-elasticsearch/v8"
Expand Down Expand Up @@ -46,16 +45,38 @@ func (config *Authentication) ConfigureAuthenticationAndHTTP(httpcfg *HTTPConfig
return err
}
}

transport := &http.Transport{}
if config.TLS != nil {
if err := configureTLS(config.TLS, opts); err != nil {
return err
tlsConfig, err := config.TLS.LoadTLSConfig()
if err != nil {
return fmt.Errorf("error loading tls config: %w", err)
}
if tlsConfig != nil {
transport.TLSClientConfig = tlsConfig
}
if config.TLS.CAFile != "" {
opts.CACert, err = os.ReadFile(config.TLS.CAFile)
if err != nil {
return err
}
}
}
if httpcfg != nil {
if err := configureHTTP(httpcfg, opts); err != nil {
return err
if httpcfg.MaxIdleConnsPerHost != 0 {
transport.MaxIdleConnsPerHost = httpcfg.MaxIdleConnsPerHost
}
if httpcfg.ResponseHeaderTimeout != "" {
var unit time.Duration
unit, err := convertTimeUnit(httpcfg.ResponseHeaderTimeout)
if err != nil {
return err
}
transport.ResponseHeaderTimeout = unit
}
}
opts.Transport = transport

return nil
}

Expand All @@ -69,62 +90,6 @@ func (plainTextConfig *PlainTextConfig) ConfigurePlaintext(opts *elasticsearch.C
return nil
}

func configureTLS(config *tlscommon.TLSConfig, opts *elasticsearch.Config) error {
tlsConfig, err := config.LoadTLSConfig()
if err != nil {
return fmt.Errorf("error loading tls config: %w", err)
}
transport := &http.Transport{}
if tlsConfig != nil && tlsConfig.InsecureSkipVerify {
transport.TLSClientConfig = tlsConfig
}
if config.CAFile != "" {
opts.CACert, err = os.ReadFile(config.CAFile)
if err != nil {
return err
}
}
mergeTransports(opts, transport)
return nil
}

func configureHTTP(httpcfg *HTTPConfig, opts *elasticsearch.Config) error {
transport := &http.Transport{}
if httpcfg.MaxIdleConnsPerHost != 0 {
transport.MaxIdleConnsPerHost = httpcfg.MaxIdleConnsPerHost
}
if httpcfg.ResponseHeaderTimeout != "" {
var unit time.Duration
unit, err := convertTimeUnit(httpcfg.ResponseHeaderTimeout)
if err != nil {
return err
}
transport.ResponseHeaderTimeout = unit
}
mergeTransports(opts, transport)
return nil
}

func mergeTransports(opts *elasticsearch.Config, transport *http.Transport) {
if opts.Transport == nil {
opts.Transport = transport
return
}

existing, _ := opts.Transport.(*http.Transport)
// Use reflection to copy fields from transport to existing Transport
transportValue := reflect.ValueOf(transport).Elem()
existingValue := reflect.ValueOf(existing).Elem()
for i := 0; i < transportValue.NumField(); i++ {
field := transportValue.Type().Field(i)
fieldValue := transportValue.Field(i)
existingField := existingValue.FieldByName(field.Name)
if existingField.IsValid() && existingField.Type() == fieldValue.Type() {
existingField.Set(fieldValue)
}
}
}

func convertTimeUnit(unit string) (time.Duration, error) {
val, ok := timeUnitMap[unit]
if !ok {
Expand Down

0 comments on commit 8b634df

Please sign in to comment.