Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Checks for corruption earlier and always report errors #5227

Merged
merged 9 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent)) {
return;
}
if (us.currentTablet == null
&& (us.failures.containsKey(keyExtent) || us.authFailures.containsKey(keyExtent))) {
if (us.currentTablet == null && (us.failures.containsKey(keyExtent)
|| us.authFailures.containsKey(keyExtent) || us.unhandledException != null)) {
// if there were previous failures, then do not accept additional writes
return;
}
Expand Down Expand Up @@ -339,6 +339,11 @@ public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent,
List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
for (TMutation tmutation : tmutations) {
Mutation mutation = new ServerMutation(tmutation);
// Deserialize the mutation in an attempt to check for data corruption that happened on
// the network. This will avoid writing a corrupt mutation to the write ahead log and
// failing after its written to the write ahead log when it is deserialized to update the
// in memory map.
mutation.getUpdates();
mutations.add(mutation);
additionalMutationSize += mutation.numBytes();
}
Expand All @@ -358,6 +363,15 @@ public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent,
}
}
}
} catch (RuntimeException e) {
// This method is a thrift oneway method so an exception from it will not make it back to the
// client. Need to record the exception and set the session such that any future updates to
// the session are ignored.
us.unhandledException = e;
us.currentTablet = null;

// Rethrowing it will cause logging from thrift, so not adding logging here.
throw e;
} finally {
if (reserved) {
server.sessionManager.unreserveSession(us);
Expand Down Expand Up @@ -536,6 +550,20 @@ public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDE
}

try {
if (us.unhandledException != null) {
// Since flush() is not being called, any memory added to the global queued mutations
// counter will not be decremented. So do that here before throwing an exception.
server.updateTotalQueuedMutationSize(-us.queuedMutationSize);
us.queuedMutationSize = 0;
// make this memory available for GC
us.queuedMutations.clear();

// Something unexpected happened during this write session, so throw an exception here to
// cause a TApplicationException on the client side.
throw new IllegalStateException(
"Write session " + updateID + " saw an unexpected exception", us.unhandledException);
}

// clients may or may not see data from an update session while
// it is in progress, however when the update session is closed
// want to ensure that reads wait for the write to finish
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class UpdateSession extends Session {
public long flushTime = 0;
public long queuedMutationSize = 0;
public final Durability durability;
public Exception unhandledException = null;

public UpdateSession(TservConstraintEnv env, TCredentials credentials, Durability durability) {
super(credentials);
Expand Down
149 changes: 149 additions & 0 deletions test/src/main/java/org/apache/accumulo/test/CorruptMutationIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TMutation;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.tabletserver.thrift.TDurability;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TServiceClient;
import org.junit.jupiter.api.Test;

public class CorruptMutationIT extends AccumuloClusterHarness {

@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "10");
}

@Test
public void testCorruptMutation() throws Exception {

String table = getUniqueNames(1)[0];
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
c.tableOperations().create(table);
try (BatchWriter writer = c.createBatchWriter(table)) {
Mutation m = new Mutation("1");
m.put("f1", "q1", new Value("v1"));
writer.addMutation(m);
}

var ctx = (ClientContext) c;
var tableId = ctx.getTableId(table);
var extent = new KeyExtent(tableId, null, null);
var tabletMetadata = ctx.getAmple().readTablet(extent, TabletMetadata.ColumnType.LOCATION);
var location = tabletMetadata.getLocation();
assertNotNull(location);
assertEquals(TabletMetadata.LocationType.CURRENT, location.getType());

TabletClientService.Iface client =
ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, location.getHostAndPort(), ctx);
// Make the same RPC calls made by the BatchWriter, but pass a corrupt serialized mutation in
// this try block.
try {
TInfo tinfo = TraceUtil.traceInfo();

long sessionId = client.startUpdate(tinfo, ctx.rpcCreds(), TDurability.DEFAULT);

// Write two valid mutations to the session. The tserver buffers data it receives via
// applyUpdates and may not write them until closeUpdate RPC is called. Because
// TSERV_TOTAL_MUTATION_QUEUE_MAX was set so small, these values should be written.
client.applyUpdates(tinfo, sessionId, extent.toThrift(),
List.of(createTMutation("abc", "z1"), createTMutation("def", "z2")));

// Simulate data corruption in the serialized mutation
TMutation badMutation = createTMutation("ghi", "z3");
badMutation.entries = -42;

// Write some good and bad mutations to the session. The server side will see an error here,
// however since this is a thrift oneway method no exception is expected here. This should
// leave the session in a broken state where it no longer accepts any new data.
client.applyUpdates(tinfo, sessionId, extent.toThrift(),
List.of(createTMutation("jkl", "z4"), badMutation, createTMutation("mno", "z5")));

// Write two more valid mutations to the session, these should be dropped on the server side
// because of the previous error. So should never see these updates.
client.applyUpdates(tinfo, sessionId, extent.toThrift(),
List.of(createTMutation("pqr", "z6"), createTMutation("stu", "z7")));

dlmarion marked this conversation as resolved.
Show resolved Hide resolved
// Since client.applyUpdates experienced an error, should see an error when closing the
// session.
assertThrows(TApplicationException.class, () -> client.closeUpdate(tinfo, sessionId));
} finally {
ThriftUtil.returnClient((TServiceClient) client, ctx);
}

// The values that a scan must see
Set<String> expectedValues = Set.of("v1", "v2", "z1", "z2");

// The failed mutation should not have left the tablet in a bad state. Do some follow-on
// actions to ensure the tablet is still functional.
try (BatchWriter writer = c.createBatchWriter(table)) {
Mutation m = new Mutation("2");
m.put("f1", "q1", new Value("v2"));
writer.addMutation(m);
}

try (Scanner scanner = c.createScanner(table)) {
var valuesSeen =
scanner.stream().map(e -> e.getValue().toString()).collect(Collectors.toSet());
assertEquals(expectedValues, valuesSeen);
}

c.tableOperations().flush(table, null, null, true);

try (Scanner scanner = c.createScanner(table)) {
var valuesSeen =
scanner.stream().map(e -> e.getValue().toString()).collect(Collectors.toSet());
assertEquals(expectedValues, valuesSeen);
}
}
}

private static TMutation createTMutation(String row, String value) {
Mutation m = new Mutation(row);
m.put("x", "y", value);
return m.toThrift();
}
}
Loading