diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/ScannerFactory.java b/warehouse/query-core/src/main/java/datawave/query/tables/ScannerFactory.java index 415d6548f7..b78488a8d0 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/ScannerFactory.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/ScannerFactory.java @@ -19,7 +19,8 @@ import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.security.Authorizations; import org.apache.hadoop.conf.Configuration; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; @@ -56,7 +57,7 @@ public class ScannerFactory { protected Map consistencyLevelMap = new HashMap<>(); protected Map> executionHintMap = new HashMap<>(); - private static final Logger log = Logger.getLogger(ScannerFactory.class); + private static final Logger log = LoggerFactory.getLogger(ScannerFactory.class); /** * Preferred constructor, builds scanner factory from configs @@ -135,7 +136,7 @@ public void updateConfigs(GenericQueryConfiguration genericConfig) { } if (log.isDebugEnabled()) { - log.debug("Created ScannerFactory " + System.identityHashCode(this) + " is wrapped ? " + (client instanceof WrappedConnector)); + log.debug("Created ScannerFactory {}, wrapped={}", System.identityHashCode(this), (client instanceof WrappedConnector)); } } @@ -145,9 +146,9 @@ public Scanner newSingleScanner(String tableName, Set auths, Que applyConfigs(bs, tableName); - log.debug("Created scanner " + System.identityHashCode(bs)); + log.debug("Created scanner {}", System.identityHashCode(bs)); if (log.isTraceEnabled()) { - log.trace("Adding instance " + bs.hashCode()); + log.trace("Adding instance {}", bs.hashCode()); } synchronized (open) { @@ -206,9 +207,9 @@ public BatchScanner newScanner(String tableName, Set auths, int applyConfigs(bs, hintKey, tableName); - log.debug("Created scanner " + System.identityHashCode(bs)); + log.debug("Created scanner {}", System.identityHashCode(bs)); if (log.isTraceEnabled()) { - log.trace("Adding instance " + bs.hashCode()); + log.trace("Adding instance {}", bs.hashCode()); } synchronized (open) { if (open.get()) { @@ -230,9 +231,9 @@ public BatchScanner newScanner(String tableName, Set auths, int applyConfigs(bs, tableName); - log.debug("Created scanner " + System.identityHashCode(bs)); + log.debug("Created scanner {}", System.identityHashCode(bs)); if (log.isTraceEnabled()) { - log.trace("Adding instance " + bs.hashCode()); + log.trace("Adding instance {}", bs.hashCode()); } synchronized (open) { if (open.get()) { @@ -358,7 +359,7 @@ public T newLimitedScanner(Class wrapper, final St Preconditions.checkNotNull(wrapper); Preconditions.checkArgument(open.get(), "Factory has been locked. No New scanners can be created"); - log.debug("Creating limited scanner whose max threads is is " + scanQueue.getCapacity() + " and max capacity is " + maxQueue); + log.debug("Creating limited scanner whose max threads is {} and max capacity is {}", scanQueue.getCapacity(), maxQueue); ScanSessionStats stats = null; if (accrueStats) { @@ -375,9 +376,9 @@ public T newLimitedScanner(Class wrapper, final St applyConfigs(session, hintKey, tableName); - log.debug("Created session " + System.identityHashCode(session)); + log.debug("Created session {}", System.identityHashCode(session)); if (log.isTraceEnabled()) { - log.trace("Adding instance " + session.hashCode()); + log.trace("Adding instance {}", session.hashCode()); } synchronized (open) { if (open.get()) { @@ -414,7 +415,7 @@ public RangeStreamScanner newRangeScanner(String tableName, Set public boolean close(ScannerBase bs) { try { - log.debug("Closed scanner " + System.identityHashCode(bs)); + log.debug("Closed scanner {}", System.identityHashCode(bs)); if (instances.remove(bs)) { if (log.isTraceEnabled()) { log.trace("Closing instance " + bs.hashCode()); @@ -460,16 +461,16 @@ public boolean lockdown() { public void close(ScannerSession bs) { try { - log.debug("Closed session " + System.identityHashCode(bs)); + log.debug("Closed session {}", System.identityHashCode(bs)); if (sessionInstances.remove(bs)) { if (log.isTraceEnabled()) { - log.trace("Closing instance " + bs.hashCode()); + log.trace("Closing instance {}", bs.hashCode()); } bs.close(); } } catch (Exception e) { // ANY EXCEPTION HERE CAN SAFELY BE IGNORED - log.trace("Exception closing ScannerSession, can be safely ignored: {}", e); + log.trace("Exception closing ScannerSession, can be safely ignored:", e); } } @@ -539,13 +540,29 @@ public void applyConfigs(ScannerBase scannerBase, String tableName) { public void applyConfigs(ScannerBase scannerBase, String hintKey, String tableName) { if (consistencyLevelMap != null && !consistencyLevelMap.isEmpty()) { - String key = consistencyLevelMap.containsKey(hintKey) ? hintKey : tableName; - scannerBase.setConsistencyLevel(consistencyLevelMap.get(key)); + ScannerBase.ConsistencyLevel level = consistencyLevelMap.get(hintKey); + if (level == null) { + level = consistencyLevelMap.get(tableName); + } + + if (level == null) { + log.trace("no consistency level found for table: {} key: {}", tableName, hintKey); + } else { + scannerBase.setConsistencyLevel(level); + } } if (executionHintMap != null && !executionHintMap.isEmpty()) { - String key = executionHintMap.containsKey(hintKey) ? hintKey : tableName; - scannerBase.setExecutionHints(executionHintMap.get(key)); + Map hint = executionHintMap.get(hintKey); + if (hint == null) { + hint = executionHintMap.get(tableName); + } + + if (hint == null) { + log.trace("no execution hint found for table: {} key: {} ", tableName, hintKey); + } else { + scannerBase.setExecutionHints(hint); + } } } @@ -563,18 +580,28 @@ protected void applyConfigs(ScannerSession scannerSession, String hintKey, Strin SessionOptions options = scannerSession.getOptions(); if (consistencyLevelMap != null && !consistencyLevelMap.isEmpty()) { - String key = consistencyLevelMap.containsKey(hintKey) ? hintKey : tableName; + ScannerBase.ConsistencyLevel level = consistencyLevelMap.get(hintKey); + if (level == null) { + level = consistencyLevelMap.get(tableName); + } - if (consistencyLevelMap.containsKey(key)) { - options.setConsistencyLevel(consistencyLevelMap.get(key)); + if (level == null) { + log.trace("no consistency level found for table: {} key: {}", tableName, hintKey); + } else { + options.setConsistencyLevel(level); } } if (executionHintMap != null && !executionHintMap.isEmpty()) { - String key = executionHintMap.containsKey(hintKey) ? hintKey : tableName; + Map hint = executionHintMap.get(hintKey); + if (hint == null) { + hint = executionHintMap.get(tableName); + } - if (executionHintMap.containsKey(key)) { - options.setExecutionHints(executionHintMap.get(key)); + if (hint == null) { + log.trace("no execution hint found for table: {} key: {} ", tableName, hintKey); + } else { + options.setExecutionHints(hint); } } diff --git a/warehouse/query-core/src/test/java/datawave/query/tables/ScannerFactoryTest.java b/warehouse/query-core/src/test/java/datawave/query/tables/ScannerFactoryTest.java index 499773360b..6f55d265b4 100644 --- a/warehouse/query-core/src/test/java/datawave/query/tables/ScannerFactoryTest.java +++ b/warehouse/query-core/src/test/java/datawave/query/tables/ScannerFactoryTest.java @@ -33,6 +33,8 @@ class ScannerFactoryTest { private static ScannerFactory scannerFactory; private static final ShardQueryConfiguration config = new ShardQueryConfiguration(); + private static final String ALT_INDEX = "altIndex"; + @BeforeAll public static void before() throws Exception { AccumuloClient client = new MyAccumuloClient("", instance); @@ -40,6 +42,8 @@ public static void before() throws Exception { scannerFactory = new ScannerFactory(config); client.tableOperations().create(TableName.SHARD); + client.tableOperations().create(TableName.SHARD_INDEX); + client.tableOperations().create(ALT_INDEX); client.instanceOperations().setProperty("accumulo.instance.name", "required-for-tests"); } @@ -171,6 +175,72 @@ void testRFileScanner() { assertEventualConsistency(scanner); } + @Test + public void testSingleScannerWithAbsentTableName() throws Exception { + Scanner scanner = scannerFactory.newSingleScanner(ALT_INDEX, getAuths(), getQuery()); + assertImmediateConsistency(scanner); + } + + @Test + public void testScannerWithAbsentTableName() throws Exception { + BatchScanner scanner = scannerFactory.newScanner(ALT_INDEX, getQuery()); + assertImmediateConsistency(scanner); + + scanner = scannerFactory.newScanner(ALT_INDEX, getAuths(), 1, getQuery(), "ALT_HINT"); + assertImmediateConsistency(scanner); + + scanner = scannerFactory.newScanner(ALT_INDEX, getAuths(), 1, getQuery(), null); + assertImmediateConsistency(scanner); + } + + @Test + public void testQueryScannerWithAbsentTableName() throws Exception { + BatchScannerSession scanner = scannerFactory.newQueryScanner(ALT_INDEX, getAuths(), getQuery()); + assertImmediateConsistency(scanner); + + scanner = scannerFactory.newQueryScanner(ALT_INDEX, getAuths(), getQuery(), "ALT_HINT"); + assertImmediateConsistency(scanner); + + scanner = scannerFactory.newQueryScanner(ALT_INDEX, getAuths(), getQuery(), null); + assertImmediateConsistency(scanner); + } + + @Test + public void testLimitedAnyFieldScannerWithAbsentTableName() throws Exception { + AnyFieldScanner scanner = scannerFactory.newLimitedScanner(AnyFieldScanner.class, ALT_INDEX, getAuths(), getQuery()); + assertImmediateConsistency(scanner); + + scanner = scannerFactory.newLimitedScanner(AnyFieldScanner.class, ALT_INDEX, getAuths(), getQuery(), "ALT_HINT"); + assertImmediateConsistency(scanner); + + scanner = scannerFactory.newLimitedScanner(AnyFieldScanner.class, ALT_INDEX, getAuths(), getQuery(), null); + assertImmediateConsistency(scanner); + } + + @Test + public void testLimitedRangeStreamScannerWithAbsentTableName() throws Exception { + RangeStreamScanner scanner = scannerFactory.newLimitedScanner(RangeStreamScanner.class, ALT_INDEX, getAuths(), getQuery()); + assertImmediateConsistency(scanner); + + scanner = scannerFactory.newLimitedScanner(RangeStreamScanner.class, ALT_INDEX, getAuths(), getQuery(), "ALT_HINT"); + assertImmediateConsistency(scanner); + + scanner = scannerFactory.newLimitedScanner(RangeStreamScanner.class, ALT_INDEX, getAuths(), getQuery(), null); + assertImmediateConsistency(scanner); + } + + @Test + public void testLimitedBatchScannerSessionWithAbsentTableName() throws Exception { + BatchScannerSession scanner = scannerFactory.newLimitedScanner(BatchScannerSession.class, ALT_INDEX, getAuths(), getQuery()); + assertImmediateConsistency(scanner); + + scanner = scannerFactory.newLimitedScanner(BatchScannerSession.class, ALT_INDEX, getAuths(), getQuery(), "ALT_HINT"); + assertImmediateConsistency(scanner); + + scanner = scannerFactory.newLimitedScanner(BatchScannerSession.class, ALT_INDEX, getAuths(), getQuery(), null); + assertImmediateConsistency(scanner); + } + private void setEventualConsistency() { Map consistencyLevels = new HashMap<>(); consistencyLevels.put(TableName.SHARD, ScannerBase.ConsistencyLevel.EVENTUAL);