Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding ability to pass options to GlobalIndexUidAggregator #2643

Open
wants to merge 19 commits into
base: integration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
c023af2
Adding ability to pass options to GlobalIndexUidAggregator
mineralntl Nov 18, 2024
216f24c
Merge branch 'integration' into feature/passCombinerOptions
mineralntl Dec 4, 2024
191ce49
Merge branch 'integration' into feature/passCombinerOptions
mineralntl Dec 4, 2024
4859531
Fixing botched merge
mineralntl Dec 4, 2024
6926c2d
Merge branch 'integration' into feature/passCombinerOptions
mineralntl Dec 6, 2024
12fa266
Merge branch 'integration' into feature/passCombinerOptions
mineralntl Dec 6, 2024
759fdaa
Merge branch 'integration' into feature/passCombinerOptions
mineralntl Dec 6, 2024
b7b4003
Merge branch 'integration' into feature/passCombinerOptions
mineralntl Dec 9, 2024
63bde27
Merge branch 'integration' into feature/passCombinerOptions
mineralntl Dec 11, 2024
1d64886
Merge branch 'integration' into feature/passCombinerOptions
mineralntl Dec 12, 2024
705cfd5
Merge branch 'integration' into feature/passCombinerOptions
mineralntl Dec 16, 2024
0d9806f
Merge branch 'integration' into feature/passCombinerOptions
mineralntl Dec 18, 2024
621c179
Merge branch 'integration' into feature/passCombinerOptions
mineralntl Dec 31, 2024
b60c330
Merge branch 'integration' into feature/passCombinerOptions
mineralntl Jan 8, 2025
752e046
Merge branch 'integration' into feature/passCombinerOptions
mineralntl Jan 10, 2025
6a30e1d
Merge branch 'integration' into feature/passCombinerOptions
mineralntl Jan 15, 2025
61d2aa6
Merge branch 'integration' into feature/passCombinerOptions
mineralntl Jan 17, 2025
175b190
Merge branch 'integration' into feature/passCombinerOptions
mineralntl Jan 22, 2025
d934073
Merge branch 'integration' into feature/passCombinerOptions
mineralntl Jan 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,9 @@ public CustomColumnToClassMapping(Integer priority, Map<String,String> opts) {
for (Entry<String,String> entry : opts.entrySet()) {
String column = entry.getKey();

final String className = entry.getValue();
final String val = entry.getValue().trim();
int spaceIdx = val.indexOf(' ');
final String className = spaceIdx < 0 ? val : val.substring(0, spaceIdx);

Pair<Text,Text> pcic;
if (ALL_CF_STR.equals(column)) {
Expand All @@ -397,6 +399,17 @@ public CustomColumnToClassMapping(Integer priority, Map<String,String> opts) {

agg = clazz.getDeclaredConstructor().newInstance();

//@formatter:off
if (spaceIdx > 0) {
final String encodedOpts = val.substring(spaceIdx + 1);
Map<String,String> aggOpts = Splitter
.on(';')
.trimResults()
.withKeyValueSeparator('=')
.split(encodedOpts);
agg.validateOptions(aggOpts);
}
//@formatter:on
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException | InvocationTargetException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,25 @@
/**
* Implementation of an Aggregator that aggregates objects of the type Uid.List. This is an optimization for the shardIndex and shardReverseIndex, where the
* list of UIDs for events will be maintained in the global index for low cardinality terms.
* <p>
* Although this combiner allows the max UIDs kept to be configured, anyone using this feature should consider the impact of using it once data has been loaded
* into the system. Decreasing the max size will likely cause UID lists to be purged as they exceed the new max UID count. Increasing the max UID could is also
* unlikely to work as one would expect since any lists that already had their UIDs purged and the ignore flag set won't start collecting UIDs even if the
* previous count is less than the new max UID count. In practice, the main intent for this feature is to configure a new cluster with a different max UID count
* (without having to re-compile the code). With the caveats mentioned, it could be used on a system with data loaded, for example if a new data type becomes
* available and one wished to increase the max UID count while data for other data types is relatively stable or the side effects of the change don't matter.
* <p>
* When this class is used with {@link datawave.iterators.PropogatingIterator}, one must be aware that this class is not actually used as a combiner but rather
* is only used for its {@link #reset()} and {@link #aggregate()} methods. PropogatingIterator allows combiner options to be passed, which means when this class
* is used with PropogatingIterator one could change the max UID count. However, if that option is used it should be noted that the base
* {@link org.apache.accumulo.core.iterators.Combiner} option validation is used and therefore the option "all" must also be set to "true" or the "columns" must
* be set in order to pass option validation. While set, the "all" or "columns" option will have no effect when used with PropogatingIterator since only the
* {@link #reset()} and {@link #aggregate()} methods are invoked.
*/
public class GlobalIndexUidAggregator extends PropogatingCombiner {
private static final Logger log = LoggerFactory.getLogger(GlobalIndexUidAggregator.class);
private static final String MAX_UIDS_OPT = "maxuids";
private static final String TIMESTAMPS_IGNORED = "timestampsIgnored";

/**
* Using a set instead of a list so that duplicate UIDs are filtered out of the list. This might happen in the case of rows with masked fields that share a
Expand Down Expand Up @@ -132,7 +148,7 @@ public Value aggregate() {
* can store up to a certain number of UIDs and after that, the lists are no longer tracked (the ignore flag will be set) and only counts are tracked.
* REMOVEDUIDs are tracked to handle minor and partial major compactions where this reduce method won't necessarily see all possible values for a given key
* (e.g., the UIDs that are being removed might be in a different RFile that isn't involved in the current compaction).
*
* <p>
* Aggregation operates in one of two modes depending on whether or not timestamps are ignored. By default, timestamps are ignored since DataWave uses date
* to the day as the timestamp in the global term index. When timestamps are ignored, we cannot infer anything about the order of values under aggregation.
* Therefore, a decision must be made about how to handle removed UIDs vs added UIDs. In that case, removed UIDs take priority. This means that adding a
Expand Down Expand Up @@ -319,9 +335,24 @@ public boolean propogateKey() {
return propogate || !uids.isEmpty() || count > 0;
}

@Override
public IteratorOptions describeOptions() {
IteratorOptions io = super.describeOptions();
io.addNamedOption(MAX_UIDS_OPT, "The maximum number of UIDs to keep in the list. Default is " + MAX + ".");
return io;
}

@Override
public boolean validateOptions(Map<String,String> options) {
boolean valid = super.validateOptions(options);
if (valid) {
if (options.containsKey(MAX_UIDS_OPT)) {
maxUids = Integer.parseInt(options.get(MAX_UIDS_OPT));
if (maxUids <= 0) {
throw new IllegalArgumentException("Max UIDs must be greater than 0.");
}
}
}
return valid;
}

Expand All @@ -336,5 +367,12 @@ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
super.init(source, options, env);
if (options.containsKey(MAX_UIDS_OPT)) {
maxUids = Integer.parseInt(options.get(MAX_UIDS_OPT));
}
}

public static void setMaxUidsOpt(IteratorSetting is, int maxUids) {
is.addOption(MAX_UIDS_OPT, Integer.toString(maxUids));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@
import org.apache.log4j.Logger;

import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Maps;

import datawave.ingest.table.aggregator.PropogatingCombiner;

/**
* Purpose: Handle arbitrary propogating aggregations.
*
* <p>
* Design: Though very similar to the DeletingIterator, due to private methods and members, we cannot directly extend the DeletingIterator. As a result, the
* class extends SKVI. This class {@code USES --> PropogatingAggregator}. Note that propAgg can be null
*
* <p>
* Initially the TotalAggregatingIterator, this class was a direct copy. At some point it was identified that there was an artifact where deletes would not be
* propogated. As a result, this class becomes nearly identical to the DeletingIterator, whereby deletes are always propogated until a full major compaction.
*/
Expand All @@ -41,7 +42,7 @@ public class PropogatingIterator implements SortedKeyValueIterator<Key,Value>, O

public static final String ATTRIBUTE_DESCRIPTION = "Aggregators apply aggregating functions to values with identical keys. You can specify the column family. DEFAULT matches the default locality group";

public static final String UNNAMED_OPTION_DESCRIPTION = "<Column Family> <Combiner>";
public static final String UNNAMED_OPTION_DESCRIPTION = "<Column Family> <Combiner> <optional: combOpt1=comVal1;combOpt2=combVal2...>";

public static final String AGGREGATOR_DEFAULT = "DEFAULT";

Expand Down Expand Up @@ -82,12 +83,7 @@ public class PropogatingIterator implements SortedKeyValueIterator<Key,Value>, O
protected Map<ByteSequence,PropogatingCombiner> aggMap;

/**
* variable to determine if we should propogate deletes
*/
private boolean shouldPropogate;

/**
* Combiner options so that we can effectively deep copy
* Combiner options so that we can effectively deepCopy
*/
protected Map<String,String> options = Maps.newHashMap();

Expand Down Expand Up @@ -127,10 +123,8 @@ public PropogatingIterator() {
* Aggregates the same partial key.
*
* @return a partial key
* @throws IOException
* for issues with read/write
*/
private boolean aggregateRowColumn() throws IOException {
private boolean aggregateRowColumn() {
// this function assumes that first value is not delete

workKey.set(iterator.getTopKey());
Expand Down Expand Up @@ -195,12 +189,9 @@ private PropogatingCombiner getAggregator(Key key) {
}

/**
* Find Top method, will attempt to aggregate, iff an aggregator is specified
*
* @throws IOException
* for issues with read/write
* Find Top method, will attempt to aggregate, if an aggregator is specified
*/
private void findTop() throws IOException {
private void findTop() {
// check if aggregation is needed
while (iterator.hasTop() && !aggregateRowColumn())
;
Expand All @@ -214,10 +205,8 @@ private void findTop() throws IOException {
* an iterator
* @param Aggregators
* mapping of aggregators
* @throws IOException
* for issues with read/write
*/
public PropogatingIterator(SortedKeyValueIterator<Key,Value> iterator, ColumnToClassMapping<Combiner> Aggregators) throws IOException {
public PropogatingIterator(SortedKeyValueIterator<Key,Value> iterator, ColumnToClassMapping<Combiner> Aggregators) {
this.iterator = iterator;
findTop();
}
Expand Down Expand Up @@ -305,7 +294,7 @@ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> op
@Override
public IteratorOptions describeOptions() {

return new IteratorOptions(ATTRIBUTE_NAME, ATTRIBUTE_DESCRIPTION, defaultMapOptions, Collections.singletonList("<ColumnFamily> <Combiner>"));
return new IteratorOptions(ATTRIBUTE_NAME, ATTRIBUTE_DESCRIPTION, defaultMapOptions, Collections.singletonList(UNNAMED_OPTION_DESCRIPTION));
}

@Override
Expand All @@ -317,24 +306,36 @@ public boolean validateOptions(Map<String,String> options) {
// Don't propagate for either scan or full major compaction. In either case, the aggregated result has combined
// all existing values for a key so we don't need to propagate temporary state that is only used to combine
// partial results with new info.
shouldPropogate = !(env.getIteratorScope() == IteratorScope.majc && env.isFullMajorCompaction()) && !(env.getIteratorScope() == IteratorScope.scan);

PropogatingCombiner propAgg = null;

for (Entry<String,String> familyOption : options.entrySet()) {
Object agg = createAggregator(familyOption.getValue());
boolean shouldPropogate = !(env.getIteratorScope() == IteratorScope.majc && env.isFullMajorCompaction())
&& !(env.getIteratorScope() == IteratorScope.scan);

options.forEach((name, value) -> {
value = value.trim();
int sepIdx = value.indexOf(' ');
String aggClass = (sepIdx < 0) ? value : value.substring(0, sepIdx);
Object agg = createAggregator(aggClass);
if (agg instanceof PropogatingCombiner) {
propAgg = PropogatingCombiner.class.cast(agg);
PropogatingCombiner propAgg = (PropogatingCombiner) agg;
if (sepIdx > 0) {
String encodedOpts = value.substring(sepIdx + 1);
//@formatter:off
Map<String,String> aggOpts = Splitter
.on(';')
.trimResults()
.withKeyValueSeparator('=')
.split(encodedOpts);
//@formatter:on
propAgg.validateOptions(aggOpts);
}
propAgg.setPropogate(shouldPropogate);
if (familyOption.getKey().equals(AGGREGATOR_DEFAULT) || familyOption.getKey().equals(AGGREGATOR_DEFAULT_OPT)) {
if (log.isTraceEnabled())
log.debug("Default aggregator is " + propAgg.getClass());
if (name.equals(AGGREGATOR_DEFAULT) || name.equals(AGGREGATOR_DEFAULT_OPT)) {
log.trace("Default aggregator is {}");
defaultAgg = propAgg;
} else {
aggMap.put(new ArrayByteSequence(familyOption.getKey().getBytes()), propAgg);
aggMap.put(new ArrayByteSequence(name.getBytes()), propAgg);
}
}
}
});
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,19 @@ public class PropogatingIteratorTest {
private static final String FIELD_TO_AGGREGATE = "UUID";
private static final long TIMESTAMP = 1349541830;

private void validateOverfullUidList(Value topValue, int count) throws InvalidProtocolBufferException {
Uid.List v = Uid.List.parseFrom(topValue.get());

Assert.assertEquals(count, v.getCOUNT());
Assert.assertEquals(0, v.getUIDList().size());
}

private void validateUids(Value topValue, String... uids) throws InvalidProtocolBufferException {
Uid.List v = Uid.List.parseFrom(topValue.get());

Assert.assertEquals(uids.length, v.getCOUNT());
for (String uid : uids) {
assertTrue(v.getUIDList().contains(uid));
Assert.assertTrue(uid + " missing from UIDs list", v.getUIDList().contains(uid));
}
}

Expand All @@ -57,7 +64,7 @@ private void validateRemoval(Value topValue, String... uids) throws InvalidProto

Assert.assertEquals(-uids.length, v.getCOUNT());
for (String uid : uids) {
assertTrue(v.getREMOVEDUIDList().contains(uid));
Assert.assertTrue(uid + " missing from Removed UIDs list", v.getREMOVEDUIDList().contains(uid));
}
}

Expand Down Expand Up @@ -262,6 +269,44 @@ private SortedMultiMapIterator createSourceWithTestData() {
return new SortedMultiMapIterator(map);
}

@Test
public void testAggregateOptions() throws IOException {
TreeMultimap<Key,Value> map = TreeMultimap.create();

map.put(newKey(SHARD, FIELD_TO_AGGREGATE, "abc"), new Value(createValueWithUid("abc.1").build().toByteArray()));
map.put(newKey(SHARD, FIELD_TO_AGGREGATE, "abc"), new Value(createValueWithUid("abc.2").build().toByteArray()));
map.put(newKey(SHARD, FIELD_TO_AGGREGATE, "abc"), new Value(createValueWithUid("abc.3").build().toByteArray()));
map.put(newKey(SHARD, FIELD_TO_AGGREGATE, "abc"), new Value(createValueWithUid("abc.4").build().toByteArray()));

map.put(newKey(SHARD, FIELD_TO_AGGREGATE, "abd"), new Value(createValueWithUid("abc.3").build().toByteArray()));

SortedMultiMapIterator data = new SortedMultiMapIterator(map);

PropogatingIterator iter = new PropogatingIterator();
Map<String,String> options = Maps.newHashMap();

String encodedOptions = "all=true;maxuids=2";
options.put(PropogatingIterator.AGGREGATOR_DEFAULT, GlobalIndexUidAggregator.class.getCanonicalName() + " " + encodedOptions);

IteratorEnvironment env = new MockIteratorEnvironment(false);

iter.init(data, options, env);

iter.seek(new Range(), Collections.emptyList(), false);

Assert.assertTrue(iter.hasTop());

Key topKey = iter.getTopKey();

Assert.assertEquals(newKey(SHARD, FIELD_TO_AGGREGATE, "abc"), topKey);
validateOverfullUidList(iter.getTopValue(), 4);
iter.next();
topKey = iter.getTopKey();
Assert.assertEquals(newKey(SHARD, FIELD_TO_AGGREGATE, "abd"), topKey);
validateUids(iter.getTopValue(), "abc.3");

}

@Test(expected = NullPointerException.class)
public void testNullEnvironment() throws IOException {
PropogatingIterator iter = new PropogatingIterator();
Expand Down
Loading