Skip to content

Commit

Permalink
systemtest: Disable ignore malformed in mapping (#14538)
Browse files Browse the repository at this point in the history
* systemtest: disable ignore malformed mode

* systemtest: refactor move init to main test func
  • Loading branch information
1pkg authored Nov 7, 2024
1 parent 9109f21 commit 3fce7d3
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 22 deletions.
2 changes: 1 addition & 1 deletion systemtest/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var (
systemtestDir string
)

func init() {
func initContainers() {
_, filename, _, ok := runtime.Caller(0)
if !ok {
panic("could not locate systemtest directory")
Expand Down
2 changes: 1 addition & 1 deletion systemtest/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ var (
Elasticsearch *espoll.Client
)

func init() {
func initElasticSearch() {
cfg := newElasticsearchConfig()
cfg.Username = adminElasticsearchUser
cfg.Password = adminElasticsearchPass
Expand Down
30 changes: 30 additions & 0 deletions systemtest/intake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
package systemtest_test

import (
"context"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/elastic/apm-server/systemtest"
"github.com/elastic/apm-server/systemtest/apmservertest"
Expand Down Expand Up @@ -73,3 +78,28 @@ func TestIntake(t *testing.T) {
}

}

func TestIntakeMalformed(t *testing.T) {
// Setup a custom ingest pipeline to test a malformed data ingestion.
r, err := systemtest.Elasticsearch.Ingest.PutPipeline(
"traces-apm@custom",
strings.NewReader(`{"processors":[{"set":{"field":"span.duration.us","value":"poison"}}]}`),
)
require.NoError(t, err)
require.False(t, r.IsError())
defer systemtest.Elasticsearch.Ingest.DeletePipeline("traces-apm@custom")
// Test malformed intake data.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
srv := apmservertest.NewServerTB(t)
systemtest.CleanupElasticsearch(t)
response := systemtest.SendBackendEventsPayload(t, srv.URL, "../testdata/intake-v2/spans.ndjson")
_, err = systemtest.Elasticsearch.SearchIndexMinDocs(
ctx,
response.Accepted,
"traces-apm*",
nil,
espoll.WithTimeout(10*time.Second),
)
require.Error(t, err, "No traces should be indexed due to traces-apm@custom pipeline")
}
2 changes: 1 addition & 1 deletion systemtest/kibana.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ var (
IntegrationPackage *fleettest.Package
)

func init() {
func initKibana() {
kibanaConfig := apmservertest.DefaultConfig().Kibana
u, err := url.Parse(kibanaConfig.Host)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion systemtest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@ import (

func TestMain(m *testing.M) {
log.Println("INFO: starting stack containers...")
initContainers()
if err := StartStackContainers(); err != nil {
log.Fatalf("failed to start stack containers: %v", err)
}

initElasticSearch()
initKibana()
initSettings()
initOTEL()
log.Println("INFO: running system tests...")
os.Exit(m.Run())
}
35 changes: 35 additions & 0 deletions systemtest/otlp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package systemtest

import "go.opentelemetry.io/otel"

var OtelErrors = make(chan error, 1)

func initOTEL() {
// otel.SetErrorHandler can only be called once per process.
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
if err == nil {
return
}
select {
case OtelErrors <- err:
default:
}
}))
}
20 changes: 2 additions & 18 deletions systemtest/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
semconv "go.opentelemetry.io/collector/semconv/v1.5.0"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
Expand All @@ -58,21 +57,6 @@ import (
"github.com/elastic/apm-tools/pkg/espoll"
)

var otelErrors = make(chan error, 1)

func init() {
// otel.SetErrorHandler can only be called once per process.
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
if err == nil {
return
}
select {
case otelErrors <- err:
default:
}
}))
}

func TestOTLPGRPCTraces(t *testing.T) {
systemtest.CleanupElasticsearch(t)
srv := apmservertest.NewServerTB(t)
Expand Down Expand Up @@ -623,7 +607,7 @@ func flushTracerProvider(ctx context.Context, tracerProvider *sdktrace.TracerPro
return err
}
select {
case err := <-otelErrors:
case err := <-systemtest.OtelErrors:
return err
default:
return nil
Expand Down Expand Up @@ -653,7 +637,7 @@ func sendOTLPMetrics(
return err
}
select {
case err := <-otelErrors:
case err := <-systemtest.OtelErrors:
return err
default:
return nil
Expand Down
55 changes: 55 additions & 0 deletions systemtest/settings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package systemtest

import (
"fmt"
"log"
"strings"
)

func initSettings() {
// Proactively test with more strict
// "ignore_malformed" mode by default.
for _, t := range []string{
"traces",
"metrics",
"logs-apm.error",
"logs-apm.app",
} {
if err := DisableIgnoreMalformed(t); err != nil {
log.Fatalf("failed to configure ignore_malformed %v", err)
}
}
}

// DisableIgnoreMalformed updates component template index setting
// to disable "ignore_malformed" inside mappings.
func DisableIgnoreMalformed(componentTemplate string) error {
r, err := Elasticsearch.Cluster.PutComponentTemplate(
fmt.Sprintf("%s@custom", componentTemplate),
strings.NewReader(`{"template":{"settings":{"index":{"mapping":{"ignore_malformed":"false"}}}}}`),
)
if err != nil {
return err
}
if r.IsError() {
return fmt.Errorf(`request to update "ignore_malformed":"false" failed for %s`, componentTemplate)
}
return nil
}

0 comments on commit 3fce7d3

Please sign in to comment.