Skip to content

Commit

Permalink
Merge branch '2.1' into 5198-prop-editor-set-context
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Jan 8, 2025
2 parents ec83554 + 8124a6b commit d513309
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ public class ZooInfoViewer implements KeywordExecutable {
DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(ZoneId.from(ZoneOffset.UTC));
private static final Logger log = LoggerFactory.getLogger(ZooInfoViewer.class);

private final NullWatcher nullWatcher =
new NullWatcher(new ReadyMonitor(ZooInfoViewer.class.getSimpleName(), 20_000L));
private NullWatcher nullWatcher;

private static final String INDENT = " ";

Expand All @@ -107,6 +106,7 @@ public String description() {

@Override
public void execute(String[] args) throws Exception {
nullWatcher = new NullWatcher(new ReadyMonitor(ZooInfoViewer.class.getSimpleName(), 20_000L));

ZooInfoViewer.Opts opts = new ZooInfoViewer.Opts();
opts.parseArgs(ZooInfoViewer.class.getName(), args);
Expand All @@ -115,16 +115,15 @@ public void execute(String[] args) throws Exception {
log.info("print properties: {}", opts.printProps);
log.info("print instances: {}", opts.printInstanceIds);

var conf = opts.getSiteConfiguration();

ZooReader zooReader = new ZooReaderWriter(conf);

try (ServerContext context = new ServerContext(conf)) {
InstanceId iid = context.getInstanceID();
generateReport(iid, opts, zooReader);
try (ServerContext context = getContext(opts)) {
generateReport(context.getInstanceID(), opts, context.getZooReader());
}
}

ServerContext getContext(ZooInfoViewer.Opts opts) {
return new ServerContext(opts.getSiteConfiguration());
}

void generateReport(final InstanceId iid, final ZooInfoViewer.Opts opts,
final ZooReader zooReader) throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@
public class ZooPropEditor implements KeywordExecutable {

private static final Logger LOG = LoggerFactory.getLogger(ZooPropEditor.class);
private final NullWatcher nullWatcher =
new NullWatcher(new ReadyMonitor(ZooInfoViewer.class.getSimpleName(), 20_000L));
private NullWatcher nullWatcher;

/**
* No-op constructor - provided so ServiceLoader autoload does not consume resources.
Expand All @@ -82,6 +81,8 @@ public String description() {

@Override
public void execute(String[] args) throws Exception {
nullWatcher = new NullWatcher(new ReadyMonitor(ZooInfoViewer.class.getSimpleName(), 20_000L));

ZooPropEditor.Opts opts = new ZooPropEditor.Opts();
opts.parseArgs(ZooPropEditor.class.getName(), args);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.accumulo.core.Constants.ZTABLES;
import static org.apache.accumulo.core.Constants.ZTABLE_NAME;
import static org.apache.accumulo.core.Constants.ZTABLE_NAMESPACE;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.eq;
Expand All @@ -50,13 +51,18 @@
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.zookeeper.ZooReader;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.server.MockServerContext;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.codec.VersionedPropCodec;
import org.apache.accumulo.server.conf.codec.VersionedProperties;
import org.apache.accumulo.server.conf.store.NamespacePropKey;
import org.apache.accumulo.server.conf.store.PropStore;
import org.apache.accumulo.server.conf.store.SystemPropKey;
import org.apache.accumulo.server.conf.store.TablePropKey;
import org.apache.accumulo.server.conf.store.impl.PropStoreWatcher;
import org.apache.accumulo.server.conf.store.impl.ZooPropStore;
import org.apache.zookeeper.data.Stat;
import org.easymock.Capture;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -139,7 +145,7 @@ public void allOpts() {
public void instanceIdOutputTest() throws Exception {
String uuid = UUID.randomUUID().toString();

ZooReader zooReader = createMock(ZooReader.class);
ZooReaderWriter zooReader = createMock(ZooReaderWriter.class);
var instanceName = "test";
expect(zooReader.getChildren(eq(ZROOT + ZINSTANCES))).andReturn(List.of(instanceName)).once();
expect(zooReader.getData(eq(ZROOT + ZINSTANCES + "/" + instanceName)))
Expand All @@ -148,14 +154,24 @@ public void instanceIdOutputTest() throws Exception {

String testFileName = "./target/zoo-info-viewer-" + System.currentTimeMillis() + ".txt";

ZooInfoViewer.Opts opts = new ZooInfoViewer.Opts();
opts.parseArgs(ZooInfoViewer.class.getName(),
new String[] {"--print-instances", "--outfile", testFileName});
ServerContext context =
MockServerContext.getWithZK(InstanceId.of(instanceName), instanceName, 20_000);
expect(context.getZooReader()).andReturn(zooReader).once();
context.close();

ZooInfoViewer viewer = new ZooInfoViewer();
viewer.generateReport(InstanceId.of(uuid), opts, zooReader);
replay(context);

verify(zooReader);
class ZooInfoViewerTestClazz extends ZooInfoViewer {
@Override
ServerContext getContext(ZooInfoViewer.Opts ots) {
return context;
}
}

ZooInfoViewer viewer = new ZooInfoViewerTestClazz();
viewer.execute(new String[] {"--print-instances", "--outfile", testFileName});

verify(zooReader, context);

String line;
try (Scanner scanner = new Scanner(new File(testFileName))) {
Expand Down Expand Up @@ -286,14 +302,31 @@ public void propTest() throws Exception {

String testFileName = "./target/zoo-info-viewer-" + System.currentTimeMillis() + ".txt";

ZooInfoViewer.Opts opts = new ZooInfoViewer.Opts();
opts.parseArgs(ZooInfoViewer.class.getName(),
new String[] {"--print-props", "--outfile", testFileName});
ZooReaderWriter zrw = createMock(ZooReaderWriter.class);
expect(zrw.getSessionTimeout()).andReturn(2_000).anyTimes();
expect(zrw.exists(eq("/accumulo/" + iid), anyObject())).andReturn(true).anyTimes();

ZooInfoViewer viewer = new ZooInfoViewer();
viewer.generateReport(InstanceId.of(uuid), opts, zooReader);
replay(zrw);

verify(zooReader);
PropStore propStore = ZooPropStore.initialize(iid, zrw);

ServerContext context = MockServerContext.getMockContextWithPropStore(iid, zrw, propStore);
expect(context.getZooReader()).andReturn(zooReader).once();
context.close();

replay(context);

class ZooInfoViewerTestClazz extends ZooInfoViewer {
@Override
ServerContext getContext(ZooInfoViewer.Opts ots) {
return context;
}
}

ZooInfoViewer viewer = new ZooInfoViewerTestClazz();
viewer.execute(new String[] {"--print-props", "--outfile", testFileName});

verify(zooReader, context);

Map<String,String> props = new HashMap<>();
try (Scanner scanner = new Scanner(new File(testFileName))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,21 @@ public long startUpdate(TInfo tinfo, TCredentials credentials, TDurability tdura

UpdateSession us =
new UpdateSession(new TservConstraintEnv(server.getContext(), security, credentials),
credentials, durability);
credentials, durability) {
@Override
public boolean cleanup() {
// This is called when a client abandons a session. When this happens need to decrement
// any queued mutations.
if (queuedMutationSize > 0) {
log.trace(
"cleaning up abandoned update session, decrementing totalQueuedMutationSize by {}",
queuedMutationSize);
server.updateTotalQueuedMutationSize(-queuedMutationSize);
queuedMutationSize = 0;
}
return true;
}
};
return server.sessionManager.createSession(us, false);
}

Expand All @@ -267,8 +281,8 @@ private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent)) {
return;
}
if (us.currentTablet == null
&& (us.failures.containsKey(keyExtent) || us.authFailures.containsKey(keyExtent))) {
if (us.currentTablet == null && (us.failures.containsKey(keyExtent)
|| us.authFailures.containsKey(keyExtent) || us.unhandledException != null)) {
// if there were previous failures, then do not accept additional writes
return;
}
Expand Down Expand Up @@ -339,6 +353,11 @@ public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent,
List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
for (TMutation tmutation : tmutations) {
Mutation mutation = new ServerMutation(tmutation);
// Deserialize the mutation in an attempt to check for data corruption that happened on
// the network. This will avoid writing a corrupt mutation to the write ahead log and
// failing after its written to the write ahead log when it is deserialized to update the
// in memory map.
mutation.getUpdates();
mutations.add(mutation);
additionalMutationSize += mutation.numBytes();
}
Expand All @@ -358,6 +377,15 @@ public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent,
}
}
}
} catch (RuntimeException e) {
// This method is a thrift oneway method so an exception from it will not make it back to the
// client. Need to record the exception and set the session such that any future updates to
// the session are ignored.
us.unhandledException = e;
us.currentTablet = null;

// Rethrowing it will cause logging from thrift, so not adding logging here.
throw e;
} finally {
if (reserved) {
server.sessionManager.unreserveSession(us);
Expand Down Expand Up @@ -541,6 +569,20 @@ public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDE
}

try {
if (us.unhandledException != null) {
// Since flush() is not being called, any memory added to the global queued mutations
// counter will not be decremented. So do that here before throwing an exception.
server.updateTotalQueuedMutationSize(-us.queuedMutationSize);
us.queuedMutationSize = 0;
// make this memory available for GC
us.queuedMutations.clear();

// Something unexpected happened during this write session, so throw an exception here to
// cause a TApplicationException on the client side.
throw new IllegalStateException(
"Write session " + updateID + " saw an unexpected exception", us.unhandledException);
}

// clients may or may not see data from an update session while
// it is in progress, however when the update session is closed
// want to ensure that reads wait for the write to finish
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,12 @@ public void run() {
}

public long updateTotalQueuedMutationSize(long additionalMutationSize) {
return totalQueuedMutationSize.addAndGet(additionalMutationSize);
var newTotal = totalQueuedMutationSize.addAndGet(additionalMutationSize);
if (log.isTraceEnabled()) {
log.trace("totalQueuedMutationSize is now {} after adding {}", newTotal,
additionalMutationSize);
}
return newTotal;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class UpdateSession extends Session {
public long flushTime = 0;
public long queuedMutationSize = 0;
public final Durability durability;
public Exception unhandledException = null;

public UpdateSession(TservConstraintEnv env, TCredentials credentials, Durability durability) {
super(credentials);
Expand Down
Loading

0 comments on commit d513309

Please sign in to comment.