diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/EmptySelectionOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/EmptySelectionOperator.java index 044d4c5da230..ae846d25c83c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/EmptySelectionOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/EmptySelectionOperator.java @@ -45,7 +45,7 @@ public class EmptySelectionOperator extends BaseOperator expressions = - SelectionOperatorUtils.extractExpressions(selection.getSelectionColumns(), indexSegment, null); + SelectionOperatorUtils.extractExpressions(selection.getSelectionColumns(), indexSegment); int numExpressions = expressions.size(); String[] columnNames = new String[numExpressions]; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java index 6c5cb3c65af8..f4661eb12484 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java @@ -55,7 +55,7 @@ public class SelectionOnlyOperator extends BaseOperator _sortSequence; private final List _expressions; private final TransformResultMetadata[] _expressionMetadata; private final Dictionary[] _dictionaries; @@ -60,9 +60,8 @@ public class SelectionOrderByOperator extends BaseOperator(Math.min(_numRowsToKeep, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY), - getComparator()); + getComparator(selection.getSelectionSortSequence())); } - private Comparator getComparator() { + private Comparator getComparator(List sortSequence) { // Compare all single-value columns - int numOrderByExpressions = _sortSequence.size(); + int numOrderByExpressions = sortSequence.size(); List valueIndexList = new ArrayList<>(numOrderByExpressions); for (int i = 0; i < numOrderByExpressions; i++) { if (_expressionMetadata[i].isSingleValue()) { @@ -100,38 +99,50 @@ private Comparator getComparator() { int numValuesToCompare = valueIndexList.size(); int[] valueIndices = new int[numValuesToCompare]; - Comparator[] valueComparators = new Comparator[numValuesToCompare]; + DataType[] dataTypes = new DataType[numValuesToCompare]; + // Use multiplier -1 or 1 to control ascending/descending order + int[] multipliers = new int[numValuesToCompare]; for (int i = 0; i < numValuesToCompare; i++) { int valueIndex = valueIndexList.get(i); valueIndices[i] = valueIndex; - switch (_expressionMetadata[valueIndex].getDataType()) { - case INT: - valueComparators[i] = (Comparator) Integer::compare; - break; - case LONG: - valueComparators[i] = (Comparator) Long::compare; - break; - case FLOAT: - valueComparators[i] = (Comparator) Float::compare; - break; - case DOUBLE: - valueComparators[i] = (Comparator) Double::compare; - break; - case STRING: - valueComparators[i] = Comparator.naturalOrder(); - break; - case BYTES: - valueComparators[i] = (Comparator) ByteArray::compare; - break; - default: - throw new IllegalStateException(); - } - if (_sortSequence.get(valueIndex).isIsAsc()) { - valueComparators[i] = valueComparators[i].reversed(); - } + dataTypes[i] = _expressionMetadata[valueIndex].getDataType(); + multipliers[i] = sortSequence.get(valueIndex).isIsAsc() ? -1 : 1; } - return new SelectionOperatorUtils.RowComparator(valueIndices, valueComparators); + return (o1, o2) -> { + for (int i = 0; i < numValuesToCompare; i++) { + int index = valueIndices[i]; + Serializable v1 = o1[index]; + Serializable v2 = o2[index]; + int result; + switch (dataTypes[i]) { + case INT: + result = ((Integer) v1).compareTo((Integer) v2); + break; + case LONG: + result = ((Long) v1).compareTo((Long) v2); + break; + case FLOAT: + result = ((Float) v1).compareTo((Float) v2); + break; + case DOUBLE: + result = ((Double) v1).compareTo((Double) v2); + break; + case STRING: + result = ((String) v1).compareTo((String) v2); + break; + case BYTES: + result = ByteArray.compare((byte[]) v1, (byte[]) v2); + break; + default: + throw new IllegalStateException(); + } + if (result != 0) { + return result * multipliers[i]; + } + } + return 0; + }; } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java index 0b87eb74270f..d1ac9fcdba13 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java @@ -38,7 +38,7 @@ public class SelectionPlanNode implements PlanNode { private final IndexSegment _indexSegment; private final Selection _selection; - private TransformPlanNode _transformPlanNode; + private final TransformPlanNode _transformPlanNode; public SelectionPlanNode(IndexSegment indexSegment, BrokerRequest brokerRequest) { _indexSegment = indexSegment; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java index 84d92879ce85..494e123ae948 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java @@ -58,19 +58,17 @@ public TransformPlanNode(IndexSegment indexSegment, BrokerRequest brokerRequest) * Helper method to extract projection columns and transform expressions from the given broker request. */ private void extractColumnsAndTransforms(BrokerRequest brokerRequest, IndexSegment indexSegment) { + Set columns = new HashSet<>(); if (brokerRequest.isSetAggregationsInfo()) { // Extract aggregation expressions for (AggregationInfo aggregationInfo : brokerRequest.getAggregationsInfo()) { if (!aggregationInfo.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) { - addExpressionColumn(AggregationFunctionUtils.getColumn(aggregationInfo)); + columns.add(AggregationFunctionUtils.getColumn(aggregationInfo)); } } - // Extract group-by expressions if (brokerRequest.isSetGroupBy()) { - for (String column : brokerRequest.getGroupBy().getExpressions()) { - addExpressionColumn(column); - } + columns.addAll(brokerRequest.getGroupBy().getExpressions()); } } else { Selection selection = brokerRequest.getSelections(); @@ -83,9 +81,7 @@ private void extractColumnsAndTransforms(BrokerRequest brokerRequest, IndexSegme _expressions.add(new TransformExpressionTree(new IdentifierAstNode(column))); } } else { - for (String column : selectionColumns) { - addExpressionColumn(column); - } + columns.addAll(selectionColumns); } // Extract order-by expressions and update maxDocPerNextCall @@ -96,7 +92,10 @@ private void extractColumnsAndTransforms(BrokerRequest brokerRequest, IndexSegme _maxDocPerNextCall = Math.min(selection.getSize(), _maxDocPerNextCall); } else { for (SelectionSort selectionSort : sortSequence) { - addExpressionColumn(selectionSort.getColumn()); + String orderByColumn = selectionSort.getColumn(); + if (!_projectionColumns.contains(orderByColumn)) { + columns.add(orderByColumn); + } } } } else { @@ -105,12 +104,11 @@ private void extractColumnsAndTransforms(BrokerRequest brokerRequest, IndexSegme _maxDocPerNextCall = 1; } } - } - - private void addExpressionColumn(String column) { - TransformExpressionTree expression = TransformExpressionTree.compileToExpressionTree(column); - expression.getColumns(_projectionColumns); - _expressions.add(expression); + for (String column : columns) { + TransformExpressionTree expression = TransformExpressionTree.compileToExpressionTree(column); + expression.getColumns(_projectionColumns); + _expressions.add(expression); + } } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java index b5ecb86e467a..cdd2254b7cbc 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java @@ -60,7 +60,6 @@ */ public class SelectionOperatorService { private final List _selectionColumns; - private final List _sortSequence; private final DataSchema _dataSchema; private final int _offset; private final int _numRowsToKeep; @@ -74,13 +73,12 @@ public class SelectionOperatorService { */ public SelectionOperatorService(Selection selection, DataSchema dataSchema) { _selectionColumns = SelectionOperatorUtils.getSelectionColumns(selection.getSelectionColumns(), dataSchema); - _sortSequence = selection.getSelectionSortSequence(); _dataSchema = dataSchema; // Select rows from offset to offset + size. _offset = selection.getOffset(); _numRowsToKeep = _offset + selection.getSize(); _rows = new PriorityQueue<>(Math.min(_numRowsToKeep, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY), - getTypeCompatibleComparator()); + getTypeCompatibleComparator(selection.getSelectionSortSequence())); } /** @@ -89,9 +87,9 @@ public SelectionOperatorService(Selection selection, DataSchema dataSchema) { * * @return flexible {@link Comparator} for selection rows. */ - private Comparator getTypeCompatibleComparator() { + private Comparator getTypeCompatibleComparator(List sortSequence) { // Compare all single-value columns - int numOrderByExpressions = _sortSequence.size(); + int numOrderByExpressions = sortSequence.size(); List valueIndexList = new ArrayList<>(numOrderByExpressions); for (int i = 0; i < numOrderByExpressions; i++) { if (!_dataSchema.getColumnDataType(i).isArray()) { @@ -101,21 +99,33 @@ private Comparator getTypeCompatibleComparator() { int numValuesToCompare = valueIndexList.size(); int[] valueIndices = new int[numValuesToCompare]; - Comparator[] valueComparators = new Comparator[numValuesToCompare]; + boolean[] isNumber = new boolean[numValuesToCompare]; + // Use multiplier -1 or 1 to control ascending/descending order + int[] multipliers = new int[numValuesToCompare]; for (int i = 0; i < numValuesToCompare; i++) { int valueIndex = valueIndexList.get(i); valueIndices[i] = valueIndex; - if (_dataSchema.getColumnDataType(i).isNumber()) { - valueComparators[i] = Comparator.comparingDouble(Number::doubleValue); - } else { - valueComparators[i] = Comparator.naturalOrder(); - } - if (_sortSequence.get(valueIndex).isIsAsc()) { - valueComparators[i] = valueComparators[i].reversed(); - } + isNumber[i] = _dataSchema.getColumnDataType(valueIndex).isNumber(); + multipliers[i] = sortSequence.get(valueIndex).isIsAsc() ? -1 : 1; } - return new SelectionOperatorUtils.RowComparator(valueIndices, valueComparators); + return (o1, o2) -> { + for (int i = 0; i < numValuesToCompare; i++) { + int index = valueIndices[i]; + Serializable v1 = o1[index]; + Serializable v2 = o2[index]; + int result; + if (isNumber[i]) { + result = Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue()); + } else { + result = ((String) v1).compareTo((String) v2); + } + if (result != 0) { + return result * multipliers[i]; + } + } + return 0; + }; } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java index 1d3eb8fabd0a..783b35780a1b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java @@ -23,7 +23,6 @@ import java.text.DecimalFormatSymbols; import java.util.ArrayList; import java.util.Collection; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -87,24 +86,51 @@ private SelectionOperatorUtils() { ThreadLocal.withInitial(() -> new DecimalFormat(DOUBLE_PATTERN, DECIMAL_FORMAT_SYMBOLS)); /** - * Extracts the expressions from a selection query, expands {@code 'SELECT *'} to all physical columns if applies. - *

For selection order-by queries, order-by expressions will be put at the front. The expressions returned are - * deduplicated. - * DO NOT change the order of the expressions returned because broker relies on that to process the query. + * Extracts the expressions from a selection-only query, expands {@code 'SELECT *'} to all physical columns if + * applies. + *

NOTE: DO NOT change the order of the expressions returned because broker relies on that to process the query. */ public static List extractExpressions(List selectionColumns, - IndexSegment indexSegment, @Nullable List sortSequence) { - Set expressionSet = new HashSet<>(); - List expressions = new ArrayList<>(); + IndexSegment indexSegment) { + if (selectionColumns.size() == 1 && selectionColumns.get(0).equals("*")) { + // For 'SELECT *', sort all physical columns so that the order is deterministic + selectionColumns = new ArrayList<>(indexSegment.getPhysicalColumnNames()); + selectionColumns.sort(null); - if (sortSequence != null) { - for (SelectionSort selectionSort : sortSequence) { - TransformExpressionTree orderByExpression = - TransformExpressionTree.compileToExpressionTree(selectionSort.getColumn()); - if (expressionSet.add(orderByExpression)) { - expressions.add(orderByExpression); + List expressions = new ArrayList<>(selectionColumns.size()); + for (String selectionColumn : selectionColumns) { + expressions.add(new TransformExpressionTree(new IdentifierAstNode(selectionColumn))); + } + return expressions; + } else { + // Note: selection expressions have been standardized during query compilation + Set selectionColumnSet = new HashSet<>(); + List expressions = new ArrayList<>(selectionColumns.size()); + for (String selectionColumn : selectionColumns) { + if (selectionColumnSet.add(selectionColumn)) { + expressions.add(TransformExpressionTree.compileToExpressionTree(selectionColumn)); } } + return expressions; + } + } + + /** + * Extracts the expressions from a selection order-by query, expands {@code 'SELECT *'} to all physical columns if + * applies. + *

Order-by expressions will be put at the front. The expressions returned are deduplicated. + *

NOTE: DO NOT change the order of the expressions returned because broker relies on that to process the query. + */ + public static List extractExpressions(List selectionColumns, + IndexSegment indexSegment, List sortSequence) { + Set columnSet = new HashSet<>(); + List expressions = new ArrayList<>(); + + // NOTE: order-by expressions have been standardized and deduplicated during query compilation + for (SelectionSort selectionSort : sortSequence) { + String orderByColumn = selectionSort.getColumn(); + columnSet.add(orderByColumn); + expressions.add(TransformExpressionTree.compileToExpressionTree(orderByColumn)); } if (selectionColumns.size() == 1 && selectionColumns.get(0).equals("*")) { @@ -113,17 +139,15 @@ public static List extractExpressions(List sele selectionColumns.sort(null); for (String selectionColumn : selectionColumns) { - TransformExpressionTree selectionExpression = - new TransformExpressionTree(new IdentifierAstNode(selectionColumn)); - if (expressionSet.add(selectionExpression)) { - expressions.add(selectionExpression); + if (!columnSet.contains(selectionColumn)) { + expressions.add(new TransformExpressionTree(new IdentifierAstNode(selectionColumn))); } } } else { + // Note: selection expressions have been standardized during query compilation for (String selectionColumn : selectionColumns) { - TransformExpressionTree selectionExpression = TransformExpressionTree.compileToExpressionTree(selectionColumn); - if (expressionSet.add(selectionExpression)) { - expressions.add(selectionExpression); + if (columnSet.add(selectionColumn)) { + expressions.add(TransformExpressionTree.compileToExpressionTree(selectionColumn)); } } } @@ -500,7 +524,7 @@ private static Serializable getFormattedValue(Serializable value, DataSchema.Col length = ints.length; formattedValue = new String[length]; for (int i = 0; i < length; i++) { - formattedValue[i] = longFormat.format((long) ints[i]); + formattedValue[i] = longFormat.format(ints[i]); } } else { long[] longs = (long[]) value; @@ -544,7 +568,7 @@ private static Serializable getFormattedValue(Serializable value, DataSchema.Col length = floats.length; formattedValue = new String[length]; for (int i = 0; i < length; i++) { - formattedValue[i] = doubleFormat.format((double) floats[i]); + formattedValue[i] = doubleFormat.format(floats[i]); } return formattedValue; } else { @@ -579,42 +603,4 @@ public static void addToPriorityQueue(T value, PriorityQueue queue, int m queue.offer(value); } } - - /** - * Helper Comparator class to compare rows. - *

Two arguments are expected to construct the comparator: - *

    - *
  • - * Value indices: an array of column indices in each row where the values need to be compared (only the - * single-value order-by columns need to be compared) - *
  • - *
  • - * Value comparators: an array of Comparator, where each element is the Comparator for the corresponding column in - * the value indices array - *
  • - *
- */ - public static class RowComparator implements Comparator { - private final int[] _valueIndices; - private final Comparator[] _valueComparators; - - public RowComparator(int[] valueIndices, Comparator[] valueComparators) { - _valueIndices = valueIndices; - _valueComparators = valueComparators; - } - - @SuppressWarnings("unchecked") - @Override - public int compare(Serializable[] o1, Serializable[] o2) { - int numValuesToCompare = _valueIndices.length; - for (int i = 0; i < numValuesToCompare; i++) { - int valueIndex = _valueIndices[i]; - int result = _valueComparators[i].compare(o1[valueIndex], o2[valueIndex]); - if (result != 0) { - return result; - } - } - return 0; - } - } } diff --git a/pinot-core/src/test/java/org/apache/pinot/query/selection/SelectionOperatorServiceTest.java b/pinot-core/src/test/java/org/apache/pinot/query/selection/SelectionOperatorServiceTest.java index 2b6c42b164cb..5ae30161a8fb 100644 --- a/pinot-core/src/test/java/org/apache/pinot/query/selection/SelectionOperatorServiceTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/query/selection/SelectionOperatorServiceTest.java @@ -91,7 +91,7 @@ public void testExtractExpressions() { IndexSegment indexSegment = mock(IndexSegment.class); when(indexSegment.getPhysicalColumnNames()).thenReturn(new HashSet<>(Arrays.asList("foo", "bar", "foobar"))); List expressions = - SelectionOperatorUtils.extractExpressions(selectionColumns, indexSegment, null); + SelectionOperatorUtils.extractExpressions(selectionColumns, indexSegment); assertEquals(expressions.size(), 5); assertEquals(expressions.get(0).toString(), "add(foo,'1')"); assertEquals(expressions.get(1).toString(), "foo"); @@ -100,7 +100,7 @@ public void testExtractExpressions() { assertEquals(expressions.get(4).toString(), "foobar"); // For 'SELECT *' select only queries, should return all physical columns in alphabetical order - expressions = SelectionOperatorUtils.extractExpressions(Collections.singletonList("*"), indexSegment, null); + expressions = SelectionOperatorUtils.extractExpressions(Collections.singletonList("*"), indexSegment); assertEquals(expressions.size(), 3); assertEquals(expressions.get(0).toString(), "bar"); assertEquals(expressions.get(1).toString(), "foo");