Skip to content

Commit

Permalink
Hotfix 2 for Selection Comparator
Browse files Browse the repository at this point in the history
- Change back to row-based switch so that comparison can be inlined
- Reduce the expression compilation
  • Loading branch information
Jackie-Jiang authored and mcvsubbu committed Nov 8, 2019
1 parent c61d37e commit c90f663
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class EmptySelectionOperator extends BaseOperator<IntermediateResultsBloc

public EmptySelectionOperator(IndexSegment indexSegment, Selection selection, TransformOperator transformOperator) {
List<TransformExpressionTree> expressions =
SelectionOperatorUtils.extractExpressions(selection.getSelectionColumns(), indexSegment, null);
SelectionOperatorUtils.extractExpressions(selection.getSelectionColumns(), indexSegment);

int numExpressions = expressions.size();
String[] columnNames = new String[numExpressions];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class SelectionOnlyOperator extends BaseOperator<IntermediateResultsBlock
public SelectionOnlyOperator(IndexSegment indexSegment, Selection selection, TransformOperator transformOperator) {
_indexSegment = indexSegment;
_transformOperator = transformOperator;
_expressions = SelectionOperatorUtils.extractExpressions(selection.getSelectionColumns(), indexSegment, null);
_expressions = SelectionOperatorUtils.extractExpressions(selection.getSelectionColumns(), indexSegment);

int numExpressions = _expressions.size();
_expressionMetadata = new TransformResultMetadata[numExpressions];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import org.apache.pinot.common.data.FieldSpec.DataType;
import org.apache.pinot.common.request.Selection;
import org.apache.pinot.common.request.SelectionSort;
import org.apache.pinot.common.request.transform.TransformExpressionTree;
Expand All @@ -46,7 +47,6 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl

private final IndexSegment _indexSegment;
private final TransformOperator _transformOperator;
private final List<SelectionSort> _sortSequence;
private final List<TransformExpressionTree> _expressions;
private final TransformResultMetadata[] _expressionMetadata;
private final Dictionary[] _dictionaries;
Expand All @@ -60,9 +60,8 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl
public SelectionOrderByOperator(IndexSegment indexSegment, Selection selection, TransformOperator transformOperator) {
_indexSegment = indexSegment;
_transformOperator = transformOperator;
_sortSequence = selection.getSelectionSortSequence();
_expressions =
SelectionOperatorUtils.extractExpressions(selection.getSelectionColumns(), indexSegment, _sortSequence);
_expressions = SelectionOperatorUtils
.extractExpressions(selection.getSelectionColumns(), indexSegment, selection.getSelectionSortSequence());

int numExpressions = _expressions.size();
_expressionMetadata = new TransformResultMetadata[numExpressions];
Expand All @@ -85,12 +84,12 @@ public SelectionOrderByOperator(IndexSegment indexSegment, Selection selection,

_numRowsToKeep = selection.getOffset() + selection.getSize();
_rows = new PriorityQueue<>(Math.min(_numRowsToKeep, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY),
getComparator());
getComparator(selection.getSelectionSortSequence()));
}

private Comparator<Serializable[]> getComparator() {
private Comparator<Serializable[]> getComparator(List<SelectionSort> sortSequence) {
// Compare all single-value columns
int numOrderByExpressions = _sortSequence.size();
int numOrderByExpressions = sortSequence.size();
List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
for (int i = 0; i < numOrderByExpressions; i++) {
if (_expressionMetadata[i].isSingleValue()) {
Expand All @@ -100,38 +99,50 @@ private Comparator<Serializable[]> getComparator() {

int numValuesToCompare = valueIndexList.size();
int[] valueIndices = new int[numValuesToCompare];
Comparator[] valueComparators = new Comparator[numValuesToCompare];
DataType[] dataTypes = new DataType[numValuesToCompare];
// Use multiplier -1 or 1 to control ascending/descending order
int[] multipliers = new int[numValuesToCompare];
for (int i = 0; i < numValuesToCompare; i++) {
int valueIndex = valueIndexList.get(i);
valueIndices[i] = valueIndex;
switch (_expressionMetadata[valueIndex].getDataType()) {
case INT:
valueComparators[i] = (Comparator<Integer>) Integer::compare;
break;
case LONG:
valueComparators[i] = (Comparator<Long>) Long::compare;
break;
case FLOAT:
valueComparators[i] = (Comparator<Float>) Float::compare;
break;
case DOUBLE:
valueComparators[i] = (Comparator<Double>) Double::compare;
break;
case STRING:
valueComparators[i] = Comparator.naturalOrder();
break;
case BYTES:
valueComparators[i] = (Comparator<byte[]>) ByteArray::compare;
break;
default:
throw new IllegalStateException();
}
if (_sortSequence.get(valueIndex).isIsAsc()) {
valueComparators[i] = valueComparators[i].reversed();
}
dataTypes[i] = _expressionMetadata[valueIndex].getDataType();
multipliers[i] = sortSequence.get(valueIndex).isIsAsc() ? -1 : 1;
}

return new SelectionOperatorUtils.RowComparator(valueIndices, valueComparators);
return (o1, o2) -> {
for (int i = 0; i < numValuesToCompare; i++) {
int index = valueIndices[i];
Serializable v1 = o1[index];
Serializable v2 = o2[index];
int result;
switch (dataTypes[i]) {
case INT:
result = ((Integer) v1).compareTo((Integer) v2);
break;
case LONG:
result = ((Long) v1).compareTo((Long) v2);
break;
case FLOAT:
result = ((Float) v1).compareTo((Float) v2);
break;
case DOUBLE:
result = ((Double) v1).compareTo((Double) v2);
break;
case STRING:
result = ((String) v1).compareTo((String) v2);
break;
case BYTES:
result = ByteArray.compare((byte[]) v1, (byte[]) v2);
break;
default:
throw new IllegalStateException();
}
if (result != 0) {
return result * multipliers[i];
}
}
return 0;
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class SelectionPlanNode implements PlanNode {

private final IndexSegment _indexSegment;
private final Selection _selection;
private TransformPlanNode _transformPlanNode;
private final TransformPlanNode _transformPlanNode;

public SelectionPlanNode(IndexSegment indexSegment, BrokerRequest brokerRequest) {
_indexSegment = indexSegment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,17 @@ public TransformPlanNode(IndexSegment indexSegment, BrokerRequest brokerRequest)
* Helper method to extract projection columns and transform expressions from the given broker request.
*/
private void extractColumnsAndTransforms(BrokerRequest brokerRequest, IndexSegment indexSegment) {
Set<String> columns = new HashSet<>();
if (brokerRequest.isSetAggregationsInfo()) {
// Extract aggregation expressions
for (AggregationInfo aggregationInfo : brokerRequest.getAggregationsInfo()) {
if (!aggregationInfo.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) {
addExpressionColumn(AggregationFunctionUtils.getColumn(aggregationInfo));
columns.add(AggregationFunctionUtils.getColumn(aggregationInfo));
}
}

// Extract group-by expressions
if (brokerRequest.isSetGroupBy()) {
for (String column : brokerRequest.getGroupBy().getExpressions()) {
addExpressionColumn(column);
}
columns.addAll(brokerRequest.getGroupBy().getExpressions());
}
} else {
Selection selection = brokerRequest.getSelections();
Expand All @@ -83,9 +81,7 @@ private void extractColumnsAndTransforms(BrokerRequest brokerRequest, IndexSegme
_expressions.add(new TransformExpressionTree(new IdentifierAstNode(column)));
}
} else {
for (String column : selectionColumns) {
addExpressionColumn(column);
}
columns.addAll(selectionColumns);
}

// Extract order-by expressions and update maxDocPerNextCall
Expand All @@ -96,7 +92,10 @@ private void extractColumnsAndTransforms(BrokerRequest brokerRequest, IndexSegme
_maxDocPerNextCall = Math.min(selection.getSize(), _maxDocPerNextCall);
} else {
for (SelectionSort selectionSort : sortSequence) {
addExpressionColumn(selectionSort.getColumn());
String orderByColumn = selectionSort.getColumn();
if (!_projectionColumns.contains(orderByColumn)) {
columns.add(orderByColumn);
}
}
}
} else {
Expand All @@ -105,12 +104,11 @@ private void extractColumnsAndTransforms(BrokerRequest brokerRequest, IndexSegme
_maxDocPerNextCall = 1;
}
}
}

private void addExpressionColumn(String column) {
TransformExpressionTree expression = TransformExpressionTree.compileToExpressionTree(column);
expression.getColumns(_projectionColumns);
_expressions.add(expression);
for (String column : columns) {
TransformExpressionTree expression = TransformExpressionTree.compileToExpressionTree(column);
expression.getColumns(_projectionColumns);
_expressions.add(expression);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
*/
public class SelectionOperatorService {
private final List<String> _selectionColumns;
private final List<SelectionSort> _sortSequence;
private final DataSchema _dataSchema;
private final int _offset;
private final int _numRowsToKeep;
Expand All @@ -74,13 +73,12 @@ public class SelectionOperatorService {
*/
public SelectionOperatorService(Selection selection, DataSchema dataSchema) {
_selectionColumns = SelectionOperatorUtils.getSelectionColumns(selection.getSelectionColumns(), dataSchema);
_sortSequence = selection.getSelectionSortSequence();
_dataSchema = dataSchema;
// Select rows from offset to offset + size.
_offset = selection.getOffset();
_numRowsToKeep = _offset + selection.getSize();
_rows = new PriorityQueue<>(Math.min(_numRowsToKeep, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY),
getTypeCompatibleComparator());
getTypeCompatibleComparator(selection.getSelectionSortSequence()));
}

/**
Expand All @@ -89,9 +87,9 @@ public SelectionOperatorService(Selection selection, DataSchema dataSchema) {
*
* @return flexible {@link Comparator} for selection rows.
*/
private Comparator<Serializable[]> getTypeCompatibleComparator() {
private Comparator<Serializable[]> getTypeCompatibleComparator(List<SelectionSort> sortSequence) {
// Compare all single-value columns
int numOrderByExpressions = _sortSequence.size();
int numOrderByExpressions = sortSequence.size();
List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
for (int i = 0; i < numOrderByExpressions; i++) {
if (!_dataSchema.getColumnDataType(i).isArray()) {
Expand All @@ -101,21 +99,33 @@ private Comparator<Serializable[]> getTypeCompatibleComparator() {

int numValuesToCompare = valueIndexList.size();
int[] valueIndices = new int[numValuesToCompare];
Comparator[] valueComparators = new Comparator[numValuesToCompare];
boolean[] isNumber = new boolean[numValuesToCompare];
// Use multiplier -1 or 1 to control ascending/descending order
int[] multipliers = new int[numValuesToCompare];
for (int i = 0; i < numValuesToCompare; i++) {
int valueIndex = valueIndexList.get(i);
valueIndices[i] = valueIndex;
if (_dataSchema.getColumnDataType(i).isNumber()) {
valueComparators[i] = Comparator.comparingDouble(Number::doubleValue);
} else {
valueComparators[i] = Comparator.naturalOrder();
}
if (_sortSequence.get(valueIndex).isIsAsc()) {
valueComparators[i] = valueComparators[i].reversed();
}
isNumber[i] = _dataSchema.getColumnDataType(valueIndex).isNumber();
multipliers[i] = sortSequence.get(valueIndex).isIsAsc() ? -1 : 1;
}

return new SelectionOperatorUtils.RowComparator(valueIndices, valueComparators);
return (o1, o2) -> {
for (int i = 0; i < numValuesToCompare; i++) {
int index = valueIndices[i];
Serializable v1 = o1[index];
Serializable v2 = o2[index];
int result;
if (isNumber[i]) {
result = Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue());
} else {
result = ((String) v1).compareTo((String) v2);
}
if (result != 0) {
return result * multipliers[i];
}
}
return 0;
};
}

/**
Expand Down
Loading

0 comments on commit c90f663

Please sign in to comment.