Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to purge data considering shardId #1811

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.siddhi.query.api.execution.query.selection.Selector;
import io.siddhi.query.api.expression.Expression;
import io.siddhi.query.api.expression.Variable;
import io.siddhi.query.api.expression.condition.And;
import io.siddhi.query.api.expression.condition.Compare;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -63,6 +64,7 @@
import java.util.concurrent.TimeUnit;

import static io.siddhi.core.util.SiddhiConstants.AGG_EXTERNAL_TIMESTAMP_COL;
import static io.siddhi.core.util.SiddhiConstants.AGG_SHARD_ID_COL;
import static io.siddhi.core.util.SiddhiConstants.AGG_START_TIMESTAMP_COL;
import static io.siddhi.query.api.expression.Expression.Time.normalizeDuration;
import static io.siddhi.query.api.expression.Expression.Time.timeToLong;
Expand All @@ -89,7 +91,7 @@ public class IncrementalDataPurger implements Runnable {
private Map<TimePeriod.Duration, Long> minimumDurationMap = new EnumMap<>(TimePeriod.Duration.class);
private ComplexEventChunk<StateEvent> eventChunk = new ComplexEventChunk<>();
private List<VariableExpressionExecutor> variableExpressionExecutorList = new ArrayList<>();
private Attribute aggregatedTimestampAttribute;
private List<Attribute> deleteQueryAttributes = new ArrayList<>();
private Map<TimePeriod.Duration, CompiledCondition> compiledConditionsHolder =
new EnumMap<>(TimePeriod.Duration.class);
private Map<String, Table> tableMap = new HashMap<>();
Expand All @@ -101,29 +103,42 @@ public class IncrementalDataPurger implements Runnable {
private boolean purgingHalted = false;
private String errorMessage;

private String shardId;
private boolean enablePartitioning;
private boolean purgedBySharedId = false;

public void init(AggregationDefinition aggregationDefinition, StreamEventFactory streamEventFactory,
Map<TimePeriod.Duration, Table> aggregationTables, Boolean isProcessingOnExternalTime,
SiddhiQueryContext siddhiQueryContext, List<TimePeriod.Duration> activeIncrementalDurations,
String timeZone, Map<String, Window> windowMap, Map<String, AggregationRuntime>
aggregationMap) {
aggregationMap, String shardId, boolean enablePartitioning, boolean purgedBySharedId) {
this.siddhiQueryContext = siddhiQueryContext;
this.aggregationDefinition = aggregationDefinition;
List<Annotation> annotations = aggregationDefinition.getAnnotations();
this.streamEventFactory = streamEventFactory;
this.aggregationTables = aggregationTables;
this.shardId = shardId;
this.enablePartitioning = enablePartitioning;
this.activeIncrementalDurations = activeIncrementalDurations;
this.windowMap = windowMap;
this.aggregationMap = aggregationMap;
this.purgedBySharedId = purgedBySharedId;
if (isProcessingOnExternalTime) {
purgingTimestampField = AGG_EXTERNAL_TIMESTAMP_COL;
} else {
purgingTimestampField = AGG_START_TIMESTAMP_COL;
}
aggregatedTimestampAttribute = new Attribute(purgingTimestampField, Attribute.Type.LONG);
Attribute aggregatedTimestampAttribute = new Attribute(purgingTimestampField, Attribute.Type.LONG);
deleteQueryAttributes.add(aggregatedTimestampAttribute);

VariableExpressionExecutor variableExpressionExecutor = new VariableExpressionExecutor(
aggregatedTimestampAttribute, 0, 1);
variableExpressionExecutorList.add(variableExpressionExecutor);
if (enablePartitioning && shardId != null) {
VariableExpressionExecutor shardIdExpressionExecutor = new VariableExpressionExecutor(
new Attribute(AGG_SHARD_ID_COL, Attribute.Type.STRING), 0, 1);
variableExpressionExecutorList.add(shardIdExpressionExecutor);
}
for (Map.Entry<TimePeriod.Duration, Table> entry : aggregationTables.entrySet()) {
this.tableMap.put(entry.getValue().getTableDefinition().getId(), entry.getValue());
switch (entry.getKey()) {
Expand Down Expand Up @@ -174,6 +189,10 @@ public void init(AggregationDefinition aggregationDefinition, StreamEventFactory
String interval = purge.getElement(SiddhiConstants.ANNOTATION_ELEMENT_INTERVAL);
purgeExecutionInterval = timeToLong(interval);
}
if (purge.getElement(SiddhiConstants.PURGE_BY_SHARD_ID_ENABLED) != null) {
purgedBySharedId = Boolean.parseBoolean(purge
.getElement(SiddhiConstants.PURGE_BY_SHARD_ID_ENABLED));
}
List<Annotation> retentions = purge.getAnnotations(SiddhiConstants.NAMESPACE_RETENTION_PERIOD);
if (retentions != null && !retentions.isEmpty()) {
Annotation retention = retentions.get(0);
Expand All @@ -200,6 +219,13 @@ public void init(AggregationDefinition aggregationDefinition, StreamEventFactory
}
}
}
if (purgedBySharedId && enablePartitioning && shardId != null) {
Attribute shardIdAttribute = new Attribute(AGG_SHARD_ID_COL, Attribute.Type.STRING);
VariableExpressionExecutor shardIdExpressionExecutor = new VariableExpressionExecutor(
shardIdAttribute, 0, 1);
variableExpressionExecutorList.add(shardIdExpressionExecutor);
deleteQueryAttributes.add(shardIdAttribute);
}
compiledConditionsHolder = createCompileConditions(aggregationTables, tableMap);
}

Expand All @@ -218,7 +244,12 @@ public void run() {
boolean isSafeToRunPurgingTask = false;
long currentTime = System.currentTimeMillis();
long purgeTime;
Object[] purgeTimeArray = new Object[1];
Object[] purgeTimeArray;
if (purgedBySharedId && enablePartitioning && shardId != null) {
purgeTimeArray = new Object[2];
} else {
purgeTimeArray = new Object[1];
}
int i = 1;
if (purgingHalted) {
LOG.error(errorMessage);
Expand Down Expand Up @@ -248,13 +279,17 @@ public void run() {
}
if (isNeededToExecutePurgeTask) {
if (isSafeToRunPurgingTask) {
if (purgedBySharedId && enablePartitioning && shardId != null) {
purgeTimeArray[1] = shardId;
}
StateEvent secEvent = createStreamEvent(purgeTimeArray, currentTime);
eventChunk.add(secEvent);
Table table = aggregationTables.get(duration);
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Purging data of table: " + table.getTableDefinition().getId() + " with a" +
" retention of timestamp : " + purgeTime);
" retention of timestamp : " + purgeTime + ", delete query event chunck " +
eventChunk.toString());
}
table.deleteEvents(eventChunk, compiledConditionsHolder.get(duration), 1);
} catch (RuntimeException e) {
Expand Down Expand Up @@ -282,18 +317,20 @@ public void run() {
/**
* Building the MatchingMetaInfoHolder for delete records
**/
private MatchingMetaInfoHolder matchingMetaInfoHolder(Table table, Attribute attribute) {
private MatchingMetaInfoHolder matchingMetaInfoHolder(Table table, List<Attribute> attributes) {
MetaStateEvent metaStateEvent = new MetaStateEvent(2);
MetaStreamEvent metaStreamEventWithDeletePara = new MetaStreamEvent();
MetaStreamEvent metaStreamEventForTable = new MetaStreamEvent();
TableDefinition deleteTableDefinition = TableDefinition.id("");
deleteTableDefinition.attribute(attribute.getName(), attribute.getType());
for (Attribute attribute: attributes) {
deleteTableDefinition.attribute(attribute.getName(), attribute.getType());
metaStreamEventWithDeletePara.addOutputData(attribute);
}
metaStreamEventWithDeletePara.setEventType(MetaStreamEvent.EventType.TABLE);
metaStreamEventWithDeletePara.addOutputData(attribute);
metaStreamEventWithDeletePara.addInputDefinition(deleteTableDefinition);
metaStreamEventForTable.setEventType(MetaStreamEvent.EventType.TABLE);
for (Attribute attributes : table.getTableDefinition().getAttributeList()) {
metaStreamEventForTable.addOutputData(attributes);
for (Attribute tableAttribute : table.getTableDefinition().getAttributeList()) {
metaStreamEventForTable.addOutputData(tableAttribute);
}
metaStreamEventForTable.addInputDefinition(table.getTableDefinition());
metaStateEvent.addEvent(metaStreamEventWithDeletePara);
Expand All @@ -316,10 +353,17 @@ private Map<TimePeriod.Duration, CompiledCondition> createCompileConditions(
table = aggregationTables.get(entry.getKey());
Variable leftVariable = new Variable(purgingTimestampField);
leftVariable.setStreamId(entry.getValue().getTableDefinition().getId());
Compare expression = new Compare(leftVariable,
Expression fullExpression = new Compare(leftVariable,
Compare.Operator.LESS_THAN, new Variable(purgingTimestampField));
compiledCondition = table.compileCondition(expression,
matchingMetaInfoHolder(table, aggregatedTimestampAttribute), variableExpressionExecutorList,
if (purgedBySharedId && enablePartitioning && shardId != null) {
Variable shardIdVariable = new Variable(AGG_SHARD_ID_COL);
shardIdVariable.setStreamId(entry.getValue().getTableDefinition().getId());
fullExpression = new And(new Compare(
shardIdVariable, Compare.Operator.EQUAL, new Variable(AGG_SHARD_ID_COL)
), fullExpression);
}
compiledCondition = table.compileCondition(fullExpression,
matchingMetaInfoHolder(table, deleteQueryAttributes), variableExpressionExecutorList,
tableMap, siddhiQueryContext);
compiledConditionMap.put(entry.getKey(), compiledCondition);
}
Expand Down Expand Up @@ -388,6 +432,10 @@ private Map<String, Boolean> isSafeToPurgeTheDuration(long purgeTime, Table pare
onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND);
OnDemandQueryRuntime onDemandQueryRuntime = OnDemandQueryParser.parse(onDemandQuery, null,
siddhiQueryContext.getSiddhiAppContext(), tableMap, windowMap, aggregationMap);
if (LOG.isDebugEnabled()) {
LOG.debug("Query to check the data exists in parent table before deletion " +
"" + onDemandQuery);
}
dataInParentTable = onDemandQueryRuntime.execute();
}
purgingCheckState.put(IS_DATA_AVAILABLE_TO_PURGE, dataToDelete != null && dataToDelete.length > 0);
Expand Down Expand Up @@ -422,26 +470,55 @@ private OnDemandQuery getOnDemandQuery(Table table, long timeFrom, long timeTo)
.orderBy(Expression.variable(purgingTimestampField), OrderByAttribute.Order.DESC)
.limit(Expression.value(1));
InputStore inputStore;
Expression shardIdEqualExpression = Expression.compare(
Expression.variable(AGG_SHARD_ID_COL),
Compare.Operator.EQUAL,
Expression.value(shardId)
);
if (timeTo != 0) {
inputStore = InputStore.store(table.getTableDefinition().getId()).
on(Expression.and(
Expression.compare(
Expression.variable(purgingTimestampField),
Compare.Operator.GREATER_THAN_EQUAL,
Expression.value(timeFrom)
),
Expression.compare(
Expression.variable(purgingTimestampField),
Compare.Operator.LESS_THAN_EQUAL,
Expression.value(timeTo)
)
));
if (purgedBySharedId && enablePartitioning && shardId != null) {
inputStore = InputStore.store(table.getTableDefinition().getId()).
on(Expression.and(Expression.and(
Expression.compare(
Expression.variable(purgingTimestampField),
Compare.Operator.GREATER_THAN_EQUAL,
Expression.value(timeFrom)
),
Expression.compare(
Expression.variable(purgingTimestampField),
Compare.Operator.LESS_THAN_EQUAL,
Expression.value(timeTo)
)
), shardIdEqualExpression));
} else {
inputStore = InputStore.store(table.getTableDefinition().getId()).
on(Expression.and(
Expression.compare(
Expression.variable(purgingTimestampField),
Compare.Operator.GREATER_THAN_EQUAL,
Expression.value(timeFrom)
),
Expression.compare(
Expression.variable(purgingTimestampField),
Compare.Operator.LESS_THAN_EQUAL,
Expression.value(timeTo)
)
));
}
} else {
inputStore = InputStore.store(table.getTableDefinition().getId()).on(Expression.compare(
Expression.variable(purgingTimestampField),
Compare.Operator.LESS_THAN_EQUAL,
Expression.value(timeFrom)
));
if (purgedBySharedId && enablePartitioning && shardId != null) {
inputStore = InputStore.store(table.getTableDefinition().getId()).on(Expression.and(
Expression.compare(Expression.variable(purgingTimestampField),
Compare.Operator.LESS_THAN_EQUAL,
Expression.value(timeFrom)), shardIdEqualExpression
));
} else {
inputStore = InputStore.store(table.getTableDefinition().getId()).on(Expression.compare(
Expression.variable(purgingTimestampField),
Compare.Operator.LESS_THAN_EQUAL,
Expression.value(timeFrom)
));
}
}

return OnDemandQuery.query().from(inputStore).select(selector);
Expand Down Expand Up @@ -500,6 +577,10 @@ private Map<String, Long> getPurgingValidationTimeDurations(TimePeriod.Duration
Event[] dataToDelete(long purgingTime, Table table) {
OnDemandQuery onDemandQuery = getOnDemandQuery(table, purgingTime, 0);
onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND);
if (LOG.isDebugEnabled()) {
LOG.debug("Query to check whether the data exists in the given table which are older that " +
"data retention rule " + onDemandQuery);
}
OnDemandQueryRuntime onDemandQueryRuntime = OnDemandQueryParser.parse(onDemandQuery, null,
siddhiQueryContext.getSiddhiAppContext(), tableMap, windowMap, aggregationMap);
return onDemandQueryRuntime.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ public final class SiddhiConstants {
public static final String TRANSPORT_CHANNEL_CREATION_IDENTIFIER = "transportChannelCreationEnabled";

public static final String NAMESPACE_PURGE = "purge";

public static final String PURGE_BY_SHARD_ID_ENABLED = "purgeByShardIdEnabled";
public static final String NAMESPACE_RETENTION_PERIOD = "retentionPeriod";

public static final String PARTITION_ID_DEFAULT = "null";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
import static io.siddhi.core.util.SiddhiConstants.PLACEHOLDER_ON_CONDITION;
import static io.siddhi.core.util.SiddhiConstants.PLACEHOLDER_SELECTORS;
import static io.siddhi.core.util.SiddhiConstants.PLACEHOLDER_TABLE_NAME;
import static io.siddhi.core.util.SiddhiConstants.PURGE_BY_SHARD_ID_ENABLED;
import static io.siddhi.core.util.SiddhiConstants.SQL_AND;
import static io.siddhi.core.util.SiddhiConstants.SQL_AS;
import static io.siddhi.core.util.SiddhiConstants.SQL_FROM;
Expand Down Expand Up @@ -270,6 +271,8 @@ public static AggregationRuntime parse(AggregationDefinition aggregationDefiniti
final boolean isDistributed;
ConfigManager configManager = siddhiAppContext.getSiddhiContext().getConfigManager();
final String shardId = configManager.extractProperty("shardId");
final boolean purgedBySharedId = Boolean.parseBoolean(configManager
.extractProperty(PURGE_BY_SHARD_ID_ENABLED));
boolean enablePartitioning = false;
// check if the setup is Active Active(distributed deployment) by checking availability of partitionById
// config
Expand All @@ -280,9 +283,10 @@ public static AggregationRuntime parse(AggregationDefinition aggregationDefiniti
enablePartitioning = enableElement == null || Boolean.parseBoolean(enableElement);
}

boolean shouldPartitionById = Boolean.parseBoolean(configManager.extractProperty("partitionById"));

if (enablePartitioning || shouldPartitionById) {
if (!enablePartitioning) {
enablePartitioning = Boolean.parseBoolean(configManager.extractProperty("partitionById"));
}
if (enablePartitioning) {
if (shardId == null) {
throw new SiddhiAppCreationException("Configuration 'shardId' not provided for @partitionById " +
"annotation");
Expand Down Expand Up @@ -473,7 +477,7 @@ public static AggregationRuntime parse(AggregationDefinition aggregationDefiniti
IncrementalDataPurger incrementalDataPurger = new IncrementalDataPurger();
incrementalDataPurger.init(aggregationDefinition, new StreamEventFactory(processedMetaStreamEvent)
, aggregationTables, isProcessingOnExternalTime, siddhiQueryContext, aggregationDurations, timeZone,
windowMap, aggregationMap);
windowMap, aggregationMap, shardId, enablePartitioning, purgedBySharedId);

//Recreate in-memory data from tables
IncrementalExecutorsInitialiser incrementalExecutorsInitialiser = new IncrementalExecutorsInitialiser(
Expand Down