Skip to content

Commit

Permalink
Improve timerange validation for retransmission from view (#1916)
Browse files Browse the repository at this point in the history
* Improve timerange validation for retransmission from view

* Google java format fix

* Add button/link to the retransmission from view documentation, shown in the retransmission popup

* Fix code docs

* Fix code docs

* Remove accidental changes

* Fix ui code

* Linter fixes

* Review fixes

* CR fixes

* Introduce retransmission type

* Fix hermes-console
  • Loading branch information
faderskd authored Nov 19, 2024
1 parent 97198f4 commit 7f28973
Show file tree
Hide file tree
Showing 23 changed files with 601 additions and 253 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package pl.allegro.tech.hermes.api;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import jakarta.validation.constraints.NotNull;
import java.time.Instant;
import pl.allegro.tech.hermes.api.constraints.TimeRangeForTopicRetransmission;
import pl.allegro.tech.hermes.api.jackson.InstantIsoSerializer;

@TimeRangeForTopicRetransmission
public final class OfflineRetransmissionFromTopicRequest extends OfflineRetransmissionRequest {

@NotNull private final String sourceTopic;
@NotNull private final Instant startTimestamp;
@NotNull private final Instant endTimestamp;

@JsonCreator
public OfflineRetransmissionFromTopicRequest(
@JsonProperty("sourceTopic") String sourceTopic,
@JsonProperty("targetTopic") String targetTopic,
@JsonProperty("startTimestamp") String startTimestamp,
@JsonProperty("endTimestamp") String endTimestamp) {
super(RetransmissionType.TOPIC, targetTopic);
this.sourceTopic = sourceTopic;
this.startTimestamp = initializeTimestamp(startTimestamp);
this.endTimestamp = initializeTimestamp(endTimestamp);
}

public String getSourceTopic() {
return sourceTopic;
}

@JsonSerialize(using = InstantIsoSerializer.class)
public Instant getStartTimestamp() {
return startTimestamp;
}

@JsonSerialize(using = InstantIsoSerializer.class)
public Instant getEndTimestamp() {
return endTimestamp;
}

@Override
public String toString() {
return "OfflineRetransmissionFromTopicRequest{"
+ "sourceTopic='"
+ sourceTopic
+ '\''
+ ", targetTopic='"
+ getTargetTopic()
+ '\''
+ ", startTimestamp="
+ startTimestamp
+ ", endTimestamp="
+ endTimestamp
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package pl.allegro.tech.hermes.api;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

public final class OfflineRetransmissionFromViewRequest extends OfflineRetransmissionRequest {

private final String sourceViewPath;

@JsonCreator
public OfflineRetransmissionFromViewRequest(
@JsonProperty("sourceViewPath") String sourceViewPath,
@JsonProperty("targetTopic") String targetTopic) {
super(RetransmissionType.VIEW, targetTopic);
this.sourceViewPath = sourceViewPath;
}

public String getSourceViewPath() {
return sourceViewPath;
}

@Override
public String toString() {
return "OfflineRetransmissionFromViewRequest{"
+ "sourceViewPath='"
+ sourceViewPath
+ '\''
+ ", targetTopic='"
+ getTargetTopic()
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
package pl.allegro.tech.hermes.api;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.constraints.OneSourceRetransmission;
import pl.allegro.tech.hermes.api.jackson.InstantIsoSerializer;

@OneSourceRetransmission
public class OfflineRetransmissionRequest {
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = OfflineRetransmissionFromViewRequest.class, name = "view"),
@JsonSubTypes.Type(value = OfflineRetransmissionFromTopicRequest.class, name = "topic")
})
public sealed class OfflineRetransmissionRequest
permits OfflineRetransmissionFromTopicRequest, OfflineRetransmissionFromViewRequest {

private static final List<DateTimeFormatter> formatters =
List.of(
Expand All @@ -26,27 +26,28 @@ public class OfflineRetransmissionRequest {
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm'Z'").withZone(ZoneId.of("UTC")));
private static final Logger logger = LoggerFactory.getLogger(OfflineRetransmissionRequest.class);

private final String sourceViewPath;
private final String sourceTopic;
private final RetransmissionType type;
@NotEmpty private final String targetTopic;
@NotNull private Instant startTimestamp;
@NotNull private Instant endTimestamp;

@JsonCreator
public OfflineRetransmissionRequest(
@JsonProperty("sourceViewPath") String sourceViewPath,
@JsonProperty("sourceTopic") String sourceTopic,
@JsonProperty("targetTopic") String targetTopic,
@JsonProperty("startTimestamp") String startTimestamp,
@JsonProperty("endTimestamp") String endTimestamp) {
this.sourceViewPath = sourceViewPath;
this.sourceTopic = sourceTopic;
public OfflineRetransmissionRequest(RetransmissionType type, String targetTopic) {
this.type = type;
this.targetTopic = targetTopic;
this.startTimestamp = initializeTimestamp(startTimestamp);
this.endTimestamp = initializeTimestamp(endTimestamp);
}

private Instant initializeTimestamp(String timestamp) {
public RetransmissionType getType() {
return type;
}

public String getTargetTopic() {
return targetTopic;
}

public enum RetransmissionType {
VIEW,
TOPIC
}

public static Instant initializeTimestamp(String timestamp) {
if (timestamp == null) {
return null;
}
Expand All @@ -62,45 +63,4 @@ private Instant initializeTimestamp(String timestamp) {
logger.warn("Provided date [{}] has an invalid format", timestamp);
return null;
}

public Optional<String> getSourceViewPath() {
return Optional.ofNullable(sourceViewPath);
}

public Optional<String> getSourceTopic() {
return Optional.ofNullable(sourceTopic);
}

public String getTargetTopic() {
return targetTopic;
}

@JsonSerialize(using = InstantIsoSerializer.class)
public Instant getStartTimestamp() {
return startTimestamp;
}

@JsonSerialize(using = InstantIsoSerializer.class)
public Instant getEndTimestamp() {
return endTimestamp;
}

@Override
public String toString() {
return "OfflineRetransmissionRequest{"
+ "sourceTopic='"
+ sourceTopic
+ '\''
+ ", sourceViewPath='"
+ sourceViewPath
+ '\''
+ ", targetTopic='"
+ targetTopic
+ '\''
+ ", startTimestamp="
+ startTimestamp
+ ", endTimestamp="
+ endTimestamp
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -1,83 +1,127 @@
package pl.allegro.tech.hermes.api;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import jakarta.annotation.Nullable;
import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
import pl.allegro.tech.hermes.api.OfflineRetransmissionRequest.RetransmissionType;
import pl.allegro.tech.hermes.api.jackson.InstantIsoSerializer;
import pl.allegro.tech.hermes.api.jackson.OptionalInstantIsoSerializer;

public class OfflineRetransmissionTask {
private final RetransmissionType type;
private final String taskId;
private final OfflineRetransmissionRequest request;
@Nullable private final String sourceViewPath;
@Nullable private final String sourceTopic;
private final String targetTopic;
@Nullable private final Instant startTimestamp;
@Nullable private final Instant endTimestamp;
private final Instant createdAt;

@JsonCreator
public OfflineRetransmissionTask(
@JsonProperty("taskId") String taskId,
@JsonProperty("sourceViewPath") String sourceViewPath,
@JsonProperty("sourceTopic") String sourceTopic,
@JsonProperty("targetTopic") String targetTopic,
@JsonProperty("startTimestamp") Instant startTimestamp,
@JsonProperty("endTimestamp") Instant endTimestamp,
@JsonProperty("createdAt") Instant createdAt) {
this(
taskId,
new OfflineRetransmissionRequest(
sourceViewPath,
sourceTopic,
targetTopic,
startTimestamp.toString(),
endTimestamp.toString()),
createdAt);
RetransmissionType type,
String taskId,
@Nullable String sourceViewPath,
@Nullable String sourceTopic,
String targetTopic,
@Nullable Instant startTimestamp,
@Nullable Instant endTimestamp,
Instant createdAt) {
this.taskId = taskId;
this.sourceViewPath = sourceViewPath;
this.sourceTopic = sourceTopic;
this.targetTopic = targetTopic;
this.startTimestamp = startTimestamp;
this.endTimestamp = endTimestamp;
this.createdAt = createdAt;
this.type = type;
}

@JsonCreator
public OfflineRetransmissionTask(
String taskId, OfflineRetransmissionRequest request, Instant createdAt) {
@JsonProperty("type") @Nullable RetransmissionType type,
@JsonProperty("taskId") String taskId,
@JsonProperty("sourceViewPath") @Nullable String sourceViewPath,
@JsonProperty("sourceTopic") @Nullable String sourceTopic,
@JsonProperty("targetTopic") String targetTopic,
@JsonProperty("startTimestamp") @Nullable String startTimestamp,
@JsonProperty("endTimestamp") @Nullable String endTimestamp,
@JsonProperty("createdAt") String createdAt) {
/*
TODO: Needed for backward compatibility when reading existing retransmissions from zookeeper, remove this once the
new version is rolled out to all environments.
*/
this.type = Objects.requireNonNullElse(type, RetransmissionType.TOPIC);
this.taskId = taskId;
this.request = request;
this.createdAt = createdAt;
this.sourceViewPath = sourceViewPath;
this.sourceTopic = sourceTopic;
this.targetTopic = targetTopic;
this.startTimestamp = OfflineRetransmissionFromTopicRequest.initializeTimestamp(startTimestamp);
this.endTimestamp = OfflineRetransmissionFromTopicRequest.initializeTimestamp(endTimestamp);
this.createdAt = OfflineRetransmissionFromTopicRequest.initializeTimestamp(createdAt);
}

public String getTaskId() {
return taskId;
}

public Optional<String> getSourceTopic() {
return request.getSourceTopic();
return Optional.ofNullable(sourceTopic);
}

public Optional<String> getSourceViewPath() {
return request.getSourceViewPath();
return Optional.ofNullable(sourceViewPath);
}

public String getTargetTopic() {
return request.getTargetTopic();
return targetTopic;
}

@JsonSerialize(using = InstantIsoSerializer.class)
public Instant getStartTimestamp() {
return request.getStartTimestamp();
@JsonSerialize(using = OptionalInstantIsoSerializer.class)
public Optional<Instant> getStartTimestamp() {
return Optional.ofNullable(startTimestamp);
}

@JsonSerialize(using = InstantIsoSerializer.class)
public Instant getEndTimestamp() {
return request.getEndTimestamp();
@JsonSerialize(using = OptionalInstantIsoSerializer.class)
public Optional<Instant> getEndTimestamp() {
return Optional.ofNullable(endTimestamp);
}

@JsonSerialize(using = InstantIsoSerializer.class)
public Instant getCreatedAt() {
return createdAt;
}

@JsonIgnore
public OfflineRetransmissionRequest getRequest() {
return request;
public RetransmissionType getType() {
return type;
}

@Override
public String toString() {
return "OfflineRetransmissionTask{" + "taskId='" + taskId + '\'' + ", request=" + request + '}';
return "OfflineRetransmissionTask{"
+ "type="
+ type
+ ", taskId='"
+ taskId
+ '\''
+ ", sourceViewPath='"
+ sourceViewPath
+ '\''
+ ", sourceTopic='"
+ sourceTopic
+ '\''
+ ", targetTopic='"
+ targetTopic
+ '\''
+ ", startTimestamp="
+ startTimestamp
+ ", endTimestamp="
+ endTimestamp
+ ", createdAt="
+ createdAt
+ '}';
}
}
Loading

0 comments on commit 7f28973

Please sign in to comment.