Skip to content

Commit

Permalink
dnm: mint source timestamps in the future
Browse files Browse the repository at this point in the history
  • Loading branch information
teskje committed Oct 22, 2024
1 parent 85a65e6 commit a1a680b
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 2 deletions.
1 change: 1 addition & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def get_default_system_parameters(
"storage_source_decode_fuel": "100000",
"storage_use_reclock_v2": "true",
"storage_reclock_to_latest": "true",
"source_time_offset": "1000",
"timestamp_oracle": "postgres",
"wait_catalog_consolidation_on_startup": "true",
"with_0dt_deployment_max_wait": "900s",
Expand Down
7 changes: 7 additions & 0 deletions src/storage-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ pub const STORAGE_RECLOCK_TO_LATEST: Config<bool> = Config::new(
"Whether to mint reclock bindings based on the latest probed offset or the latest ingested offset."
);

pub const SOURCE_TIME_OFFSET: Config<usize> = Config::new(
"source_time_offset",
0,
"Offset to add to times minted by sources.",
);

/// Adds the full set of all storage `Config`s.
pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
configs
Expand All @@ -244,4 +250,5 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&STORAGE_SUSPEND_AND_RESTART_DELAY)
.add(&STORAGE_USE_RECLOCK_V2)
.add(&STORAGE_RECLOCK_TO_LATEST)
.add(&SOURCE_TIME_OFFSET)
}
12 changes: 10 additions & 2 deletions src/storage/src/source/source_reader_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use mz_ore::error::ErrorExt;
use mz_ore::now::NowFn;
use mz_ore::vec::VecExt;
use mz_persist_client::cache::PersistClientCache;
use mz_repr::{Diff, GlobalId, RelationDesc, Row};
use mz_repr::{Diff, GlobalId, RelationDesc, Row, TimestampManipulation};
use mz_storage_types::configuration::StorageConfiguration;
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::dyncfgs;
Expand Down Expand Up @@ -525,6 +525,11 @@ where
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let reclock_to_latest = dyncfgs::STORAGE_RECLOCK_TO_LATEST.get(&config.config.config_set());
let time_offset = mz_repr::Timestamp::from(
dyncfgs::SOURCE_TIME_OFFSET.get(&config.config.config_set()) as u64,
);

info!("source_time_offset={time_offset:?}");

let mut prev_probe: Option<Probe<FromTime>> = None;
let timestamp_interval_ms: u64 = timestamp_interval
Expand Down Expand Up @@ -553,7 +558,10 @@ where
});
prev_probe = new_probe;
let probe = prev_probe.clone().unwrap();
(probe.probe_ts, probe.upstream_frontier)
(
probe.probe_ts.step_forward_by(&time_offset),
probe.upstream_frontier,
)
} else {
ticker.tick().await;
// We only proceed if the source upper frontier is not the minimum frontier. This
Expand Down

0 comments on commit a1a680b

Please sign in to comment.