Skip to content

Commit

Permalink
Avoid NPE when selecting hints and consistency levels (#2633)
Browse files Browse the repository at this point in the history
* Avoid NPE when selecting hints and consistency levels

* Clarify log statements based on feedback
  • Loading branch information
apmoriarty authored and hgklohr committed Nov 19, 2024
1 parent 226b048 commit 4dc217f
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

Expand Down Expand Up @@ -56,7 +57,7 @@ public class ScannerFactory {
protected Map<String,ScannerBase.ConsistencyLevel> consistencyLevelMap = new HashMap<>();
protected Map<String,Map<String,String>> executionHintMap = new HashMap<>();

private static final Logger log = Logger.getLogger(ScannerFactory.class);
private static final Logger log = LoggerFactory.getLogger(ScannerFactory.class);

/**
* Preferred constructor, builds scanner factory from configs
Expand Down Expand Up @@ -135,7 +136,7 @@ public void updateConfigs(GenericQueryConfiguration genericConfig) {
}

if (log.isDebugEnabled()) {
log.debug("Created ScannerFactory " + System.identityHashCode(this) + " is wrapped ? " + (client instanceof WrappedConnector));
log.debug("Created ScannerFactory {}, wrapped={}", System.identityHashCode(this), (client instanceof WrappedConnector));
}
}

Expand All @@ -145,9 +146,9 @@ public Scanner newSingleScanner(String tableName, Set<Authorizations> auths, Que

applyConfigs(bs, tableName);

log.debug("Created scanner " + System.identityHashCode(bs));
log.debug("Created scanner {}", System.identityHashCode(bs));
if (log.isTraceEnabled()) {
log.trace("Adding instance " + bs.hashCode());
log.trace("Adding instance {}", bs.hashCode());
}

synchronized (open) {
Expand Down Expand Up @@ -206,9 +207,9 @@ public BatchScanner newScanner(String tableName, Set<Authorizations> auths, int

applyConfigs(bs, hintKey, tableName);

log.debug("Created scanner " + System.identityHashCode(bs));
log.debug("Created scanner {}", System.identityHashCode(bs));
if (log.isTraceEnabled()) {
log.trace("Adding instance " + bs.hashCode());
log.trace("Adding instance {}", bs.hashCode());
}
synchronized (open) {
if (open.get()) {
Expand All @@ -230,9 +231,9 @@ public BatchScanner newScanner(String tableName, Set<Authorizations> auths, int

applyConfigs(bs, tableName);

log.debug("Created scanner " + System.identityHashCode(bs));
log.debug("Created scanner {}", System.identityHashCode(bs));
if (log.isTraceEnabled()) {
log.trace("Adding instance " + bs.hashCode());
log.trace("Adding instance {}", bs.hashCode());
}
synchronized (open) {
if (open.get()) {
Expand Down Expand Up @@ -358,7 +359,7 @@ public <T extends ScannerSession> T newLimitedScanner(Class<T> wrapper, final St
Preconditions.checkNotNull(wrapper);
Preconditions.checkArgument(open.get(), "Factory has been locked. No New scanners can be created");

log.debug("Creating limited scanner whose max threads is is " + scanQueue.getCapacity() + " and max capacity is " + maxQueue);
log.debug("Creating limited scanner whose max threads is {} and max capacity is {}", scanQueue.getCapacity(), maxQueue);

ScanSessionStats stats = null;
if (accrueStats) {
Expand All @@ -375,9 +376,9 @@ public <T extends ScannerSession> T newLimitedScanner(Class<T> wrapper, final St

applyConfigs(session, hintKey, tableName);

log.debug("Created session " + System.identityHashCode(session));
log.debug("Created session {}", System.identityHashCode(session));
if (log.isTraceEnabled()) {
log.trace("Adding instance " + session.hashCode());
log.trace("Adding instance {}", session.hashCode());
}
synchronized (open) {
if (open.get()) {
Expand Down Expand Up @@ -414,7 +415,7 @@ public RangeStreamScanner newRangeScanner(String tableName, Set<Authorizations>

public boolean close(ScannerBase bs) {
try {
log.debug("Closed scanner " + System.identityHashCode(bs));
log.debug("Closed scanner {}", System.identityHashCode(bs));
if (instances.remove(bs)) {
if (log.isTraceEnabled()) {
log.trace("Closing instance " + bs.hashCode());
Expand Down Expand Up @@ -460,16 +461,16 @@ public boolean lockdown() {

public void close(ScannerSession bs) {
try {
log.debug("Closed session " + System.identityHashCode(bs));
log.debug("Closed session {}", System.identityHashCode(bs));
if (sessionInstances.remove(bs)) {
if (log.isTraceEnabled()) {
log.trace("Closing instance " + bs.hashCode());
log.trace("Closing instance {}", bs.hashCode());
}
bs.close();
}
} catch (Exception e) {
// ANY EXCEPTION HERE CAN SAFELY BE IGNORED
log.trace("Exception closing ScannerSession, can be safely ignored: {}", e);
log.trace("Exception closing ScannerSession, can be safely ignored:", e);
}
}

Expand Down Expand Up @@ -539,13 +540,29 @@ public void applyConfigs(ScannerBase scannerBase, String tableName) {
public void applyConfigs(ScannerBase scannerBase, String hintKey, String tableName) {

if (consistencyLevelMap != null && !consistencyLevelMap.isEmpty()) {
String key = consistencyLevelMap.containsKey(hintKey) ? hintKey : tableName;
scannerBase.setConsistencyLevel(consistencyLevelMap.get(key));
ScannerBase.ConsistencyLevel level = consistencyLevelMap.get(hintKey);
if (level == null) {
level = consistencyLevelMap.get(tableName);
}

if (level == null) {
log.trace("no consistency level found for table: {} key: {}", tableName, hintKey);
} else {
scannerBase.setConsistencyLevel(level);
}
}

if (executionHintMap != null && !executionHintMap.isEmpty()) {
String key = executionHintMap.containsKey(hintKey) ? hintKey : tableName;
scannerBase.setExecutionHints(executionHintMap.get(key));
Map<String,String> hint = executionHintMap.get(hintKey);
if (hint == null) {
hint = executionHintMap.get(tableName);
}

if (hint == null) {
log.trace("no execution hint found for table: {} key: {} ", tableName, hintKey);
} else {
scannerBase.setExecutionHints(hint);
}
}
}

Expand All @@ -563,18 +580,28 @@ protected void applyConfigs(ScannerSession scannerSession, String hintKey, Strin
SessionOptions options = scannerSession.getOptions();

if (consistencyLevelMap != null && !consistencyLevelMap.isEmpty()) {
String key = consistencyLevelMap.containsKey(hintKey) ? hintKey : tableName;
ScannerBase.ConsistencyLevel level = consistencyLevelMap.get(hintKey);
if (level == null) {
level = consistencyLevelMap.get(tableName);
}

if (consistencyLevelMap.containsKey(key)) {
options.setConsistencyLevel(consistencyLevelMap.get(key));
if (level == null) {
log.trace("no consistency level found for table: {} key: {}", tableName, hintKey);
} else {
options.setConsistencyLevel(level);
}
}

if (executionHintMap != null && !executionHintMap.isEmpty()) {
String key = executionHintMap.containsKey(hintKey) ? hintKey : tableName;
Map<String,String> hint = executionHintMap.get(hintKey);
if (hint == null) {
hint = executionHintMap.get(tableName);
}

if (executionHintMap.containsKey(key)) {
options.setExecutionHints(executionHintMap.get(key));
if (hint == null) {
log.trace("no execution hint found for table: {} key: {} ", tableName, hintKey);
} else {
options.setExecutionHints(hint);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@ class ScannerFactoryTest {
private static ScannerFactory scannerFactory;
private static final ShardQueryConfiguration config = new ShardQueryConfiguration();

private static final String ALT_INDEX = "altIndex";

@BeforeAll
public static void before() throws Exception {
AccumuloClient client = new MyAccumuloClient("", instance);
config.setClient(client);
scannerFactory = new ScannerFactory(config);

client.tableOperations().create(TableName.SHARD);
client.tableOperations().create(TableName.SHARD_INDEX);
client.tableOperations().create(ALT_INDEX);
client.instanceOperations().setProperty("accumulo.instance.name", "required-for-tests");
}

Expand Down Expand Up @@ -171,6 +175,72 @@ void testRFileScanner() {
assertEventualConsistency(scanner);
}

@Test
public void testSingleScannerWithAbsentTableName() throws Exception {
Scanner scanner = scannerFactory.newSingleScanner(ALT_INDEX, getAuths(), getQuery());
assertImmediateConsistency(scanner);
}

@Test
public void testScannerWithAbsentTableName() throws Exception {
BatchScanner scanner = scannerFactory.newScanner(ALT_INDEX, getQuery());
assertImmediateConsistency(scanner);

scanner = scannerFactory.newScanner(ALT_INDEX, getAuths(), 1, getQuery(), "ALT_HINT");
assertImmediateConsistency(scanner);

scanner = scannerFactory.newScanner(ALT_INDEX, getAuths(), 1, getQuery(), null);
assertImmediateConsistency(scanner);
}

@Test
public void testQueryScannerWithAbsentTableName() throws Exception {
BatchScannerSession scanner = scannerFactory.newQueryScanner(ALT_INDEX, getAuths(), getQuery());
assertImmediateConsistency(scanner);

scanner = scannerFactory.newQueryScanner(ALT_INDEX, getAuths(), getQuery(), "ALT_HINT");
assertImmediateConsistency(scanner);

scanner = scannerFactory.newQueryScanner(ALT_INDEX, getAuths(), getQuery(), null);
assertImmediateConsistency(scanner);
}

@Test
public void testLimitedAnyFieldScannerWithAbsentTableName() throws Exception {
AnyFieldScanner scanner = scannerFactory.newLimitedScanner(AnyFieldScanner.class, ALT_INDEX, getAuths(), getQuery());
assertImmediateConsistency(scanner);

scanner = scannerFactory.newLimitedScanner(AnyFieldScanner.class, ALT_INDEX, getAuths(), getQuery(), "ALT_HINT");
assertImmediateConsistency(scanner);

scanner = scannerFactory.newLimitedScanner(AnyFieldScanner.class, ALT_INDEX, getAuths(), getQuery(), null);
assertImmediateConsistency(scanner);
}

@Test
public void testLimitedRangeStreamScannerWithAbsentTableName() throws Exception {
RangeStreamScanner scanner = scannerFactory.newLimitedScanner(RangeStreamScanner.class, ALT_INDEX, getAuths(), getQuery());
assertImmediateConsistency(scanner);

scanner = scannerFactory.newLimitedScanner(RangeStreamScanner.class, ALT_INDEX, getAuths(), getQuery(), "ALT_HINT");
assertImmediateConsistency(scanner);

scanner = scannerFactory.newLimitedScanner(RangeStreamScanner.class, ALT_INDEX, getAuths(), getQuery(), null);
assertImmediateConsistency(scanner);
}

@Test
public void testLimitedBatchScannerSessionWithAbsentTableName() throws Exception {
BatchScannerSession scanner = scannerFactory.newLimitedScanner(BatchScannerSession.class, ALT_INDEX, getAuths(), getQuery());
assertImmediateConsistency(scanner);

scanner = scannerFactory.newLimitedScanner(BatchScannerSession.class, ALT_INDEX, getAuths(), getQuery(), "ALT_HINT");
assertImmediateConsistency(scanner);

scanner = scannerFactory.newLimitedScanner(BatchScannerSession.class, ALT_INDEX, getAuths(), getQuery(), null);
assertImmediateConsistency(scanner);
}

private void setEventualConsistency() {
Map<String,ScannerBase.ConsistencyLevel> consistencyLevels = new HashMap<>();
consistencyLevels.put(TableName.SHARD, ScannerBase.ConsistencyLevel.EVENTUAL);
Expand Down

0 comments on commit 4dc217f

Please sign in to comment.