Skip to content

Commit

Permalink
Rocksdb manual flush code changes
Browse files Browse the repository at this point in the history
  • Loading branch information
neethuhaneesha committed Jan 17, 2025
1 parent d735250 commit 56d5c17
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 32 deletions.
6 changes: 3 additions & 3 deletions fdbclient/ServerKnobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -597,8 +597,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_CF_METRICS_DELAY, 900.0 );
init( ROCKSDB_MAX_LOG_FILE_SIZE, 10485760 ); // 10MB.
init( ROCKSDB_KEEP_LOG_FILE_NUM, 100 ); // Keeps 1GB log per storage server.
// Does manual flushes at regular intervals(seconds), incase rocksdb did not flush. Feature disable if the value is 0.
init( ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL, 0 ); if( randomize && BUGGIFY ) ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL = deterministicRandom()->randomInt(4, 10);
// Does manual flushes at regular intervals(seconds), incase rocksdb did not flush. Feature disabled if the value is 0.
init( ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL, 600 ); if( isSimulated ) ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL = deterministicRandom()->randomInt(4, 1200);
init( ROCKSDB_SKIP_STATS_UPDATE_ON_OPEN, true );
init( ROCKSDB_SKIP_FILE_SIZE_CHECK_ON_OPEN, true );
init( ROCKSDB_FULLFILE_CHECKSUM, false ); if ( randomize && BUGGIFY ) ROCKSDB_FULLFILE_CHECKSUM = true;
Expand All @@ -609,7 +609,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_MEMTABLE_PROTECTION_BYTES_PER_KEY, 0 ); if ( randomize && BUGGIFY ) ROCKSDB_MEMTABLE_PROTECTION_BYTES_PER_KEY = 8; // Default: 0 (disabled). Supported values: 0, 1, 2, 4, 8.
// Block cache key-value checksum. Checksum is validated during read, so has non-trivial impact on read performance.
init( ROCKSDB_BLOCK_PROTECTION_BYTES_PER_KEY, 0 ); if ( randomize && BUGGIFY ) ROCKSDB_BLOCK_PROTECTION_BYTES_PER_KEY = 8; // Default: 0 (disabled). Supported values: 0, 1, 2, 4, 8.
init( ROCKSDB_METRICS_IN_SIMULATION, false );
init( ROCKSDB_ENABLE_NONDETERMINISM, false );
init( SHARDED_ROCKSDB_ALLOW_WRITE_STALL_ON_FLUSH, false );
init( SHARDED_ROCKSDB_VALIDATE_MAPPING_RATIO, 0.01 ); if (isSimulated) SHARDED_ROCKSDB_VALIDATE_MAPPING_RATIO = deterministicRandom()->random01();
init( SHARD_METADATA_SCAN_BYTES_LIMIT, 10485760 ); // 10MB
Expand Down
8 changes: 4 additions & 4 deletions fdbclient/include/fdbclient/ServerKnobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -578,10 +578,10 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ServerKnobs : public KnobsImpl<ServerKno
int ROCKSDB_WRITEBATCH_PROTECTION_BYTES_PER_KEY;
int ROCKSDB_MEMTABLE_PROTECTION_BYTES_PER_KEY;
int ROCKSDB_BLOCK_PROTECTION_BYTES_PER_KEY;
bool ROCKSDB_METRICS_IN_SIMULATION; // Whether rocksdb traceevent metrics will be emitted in simulation. Note that
// turning this on in simulation could lead to non-deterministic runs since we
// rely on rocksdb metadata. This knob also applies to sharded rocks storage
// engine.
bool ROCKSDB_ENABLE_NONDETERMINISM; // Whether rocksdb nondeterministic behavior should be enabled in simulation.
// Note that turning this on in simulation could lead to non-deterministic runs
// since we rely on rocksdb metadata. This knob also applies to sharded rocks
// storage engine.
bool SHARDED_ROCKSDB_ALLOW_WRITE_STALL_ON_FLUSH;
int SHARDED_ROCKSDB_MEMTABLE_MAX_RANGE_DELETIONS;
double SHARDED_ROCKSDB_VALIDATE_MAPPING_RATIO;
Expand Down
56 changes: 34 additions & 22 deletions fdbserver/KeyValueStoreRocksDB.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ class SharedRocksDBState {
rocksdb::Options getOptions() const { return rocksdb::Options(this->dbOptions, this->cfOptions); }
rocksdb::ReadOptions getReadOptions() { return this->readOptions; }
rocksdb::FlushOptions getFlushOptions() { return this->flushOptions; }
const double getLastFlushTime() { return this->lastFlushTime_; }
void setLastFlushTime(double lastFlushTime) { this->lastFlushTime_ = lastFlushTime; }

private:
const UID id;
Expand All @@ -120,6 +122,7 @@ class SharedRocksDBState {
rocksdb::ColumnFamilyOptions cfOptions;
rocksdb::ReadOptions readOptions;
rocksdb::FlushOptions flushOptions;
std::atomic<double> lastFlushTime_;
};

SharedRocksDBState::SharedRocksDBState(UID id)
Expand Down Expand Up @@ -374,12 +377,14 @@ class RocksDBErrorListener : public rocksdb::EventListener {

class RocksDBEventListener : public rocksdb::EventListener {
public:
RocksDBEventListener(std::shared_ptr<double> lastFlushTime) : lastFlushTime(lastFlushTime){};
RocksDBEventListener(std::shared_ptr<SharedRocksDBState> sharedState) : sharedState(sharedState){};

void OnFlushCompleted(rocksdb::DB* db, const rocksdb::FlushJobInfo& info) override { *lastFlushTime = now(); }
void OnFlushCompleted(rocksdb::DB* db, const rocksdb::FlushJobInfo& info) override {
sharedState->setLastFlushTime(now());
}

private:
std::shared_ptr<double> lastFlushTime;
std::shared_ptr<SharedRocksDBState> sharedState;
};

using DB = rocksdb::DB*;
Expand Down Expand Up @@ -986,19 +991,30 @@ ACTOR Future<Void> flowLockLogger(UID id, const FlowLock* readLock, const FlowLo
}
}

ACTOR Future<Void> manualFlush(UID id,
rocksdb::DB* db,
std::shared_ptr<SharedRocksDBState> sharedState,
std::shared_ptr<double> lastFlushTime,
CF cf) {
ACTOR Future<Void> manualFlush(UID id, rocksdb::DB* db, std::shared_ptr<SharedRocksDBState> sharedState, CF cf) {
if (SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) {
state rocksdb::FlushOptions fOptions = sharedState->getFlushOptions();
state double waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL;
state double currTime = 0;
state int timeElapsedAfterLastFlush = 0;
loop {
wait(delay(SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL));
wait(delay(waitTime));

if ((now() - *lastFlushTime) > SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) {
currTime = now();
timeElapsedAfterLastFlush = currTime - sharedState->getLastFlushTime();
if (timeElapsedAfterLastFlush >= SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) {
db->Flush(fOptions, cf);
TraceEvent e("RocksDBManualFlush", id);
waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL;
TraceEvent("RocksDBManualFlush", id).detail("TimeElapsedAfterLastFlush", timeElapsedAfterLastFlush);
} else {
waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL - timeElapsedAfterLastFlush;
}

// The above code generates different waitTimes based on rocksdb background flushes which causes non
// deterministic behavior. Setting constant waitTimes in simulation to avoid this. And enable the behavior
// only in RocksdbNondeterministic(ROCKSDB_ENABLE_NONDETERMINISM=true) test.
if (g_network->isSimulated() && !SERVER_KNOBS->ROCKSDB_ENABLE_NONDETERMINISM) {
waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL;
}
}
}
Expand Down Expand Up @@ -1289,11 +1305,9 @@ struct RocksDBKeyValueStore : IKeyValueStore {
const FlowLock* fetchLock,
std::shared_ptr<RocksDBErrorListener> errorListener,
std::shared_ptr<RocksDBEventListener> eventListener,
std::shared_ptr<double> lastFlushTime,
Counters& counters)
: path(std::move(path)), metrics(metrics), readLock(readLock), fetchLock(fetchLock),
errorListener(errorListener), eventListener(eventListener), lastFlushTime(lastFlushTime),
counters(counters) {}
errorListener(errorListener), eventListener(eventListener), counters(counters) {}

double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
};
Expand Down Expand Up @@ -1358,7 +1372,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
// The current thread and main thread are same when the code runs in simulation.
// blockUntilReady() is getting the thread into deadlock state, so directly calling
// the metricsLogger.
if (SERVER_KNOBS->ROCKSDB_METRICS_IN_SIMULATION) {
if (SERVER_KNOBS->ROCKSDB_ENABLE_NONDETERMINISM) {
a.metrics = rocksDBMetricLogger(id,
sharedState,
options.statistics,
Expand All @@ -1368,10 +1382,10 @@ struct RocksDBKeyValueStore : IKeyValueStore {
&a.counters,
cf) &&
flowLockLogger(id, a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool) &&
manualFlush(id, db, sharedState, a.lastFlushTime, cf);
manualFlush(id, db, sharedState, cf);
} else {
a.metrics = flowLockLogger(id, a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool) &&
manualFlush(id, db, sharedState, a.lastFlushTime, cf);
manualFlush(id, db, sharedState, cf);
}
} else {
onMainThread([&] {
Expand All @@ -1384,7 +1398,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
&a.counters,
cf) &&
flowLockLogger(id, a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool) &&
manualFlush(id, db, sharedState, a.lastFlushTime, cf);
manualFlush(id, db, sharedState, cf);
return Future<bool>(true);
}).blockUntilReady();
}
Expand Down Expand Up @@ -1898,8 +1912,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
numReadWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
numFetchWaiters(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX),
errorListener(std::make_shared<RocksDBErrorListener>(id)), errorFuture(errorListener->getFuture()) {
lastFlushTime = std::make_shared<double>(now());
eventListener = std::make_shared<RocksDBEventListener>(lastFlushTime);
eventListener = std::make_shared<RocksDBEventListener>(sharedState);
// In simluation, run the reader/writer threads as Coro threads (i.e. in the network thread. The storage engine
// is still multi-threaded as background compaction threads are still present. Reads/writes to disk will also
// block the network thread in a way that would be unacceptable in production but is a necessary evil here. When
Expand Down Expand Up @@ -2093,7 +2106,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
return openFuture;
}
auto a = std::make_unique<Writer::OpenAction>(
path, metrics, &readSemaphore, &fetchSemaphore, errorListener, eventListener, lastFlushTime, counters);
path, metrics, &readSemaphore, &fetchSemaphore, errorListener, eventListener, counters);
openFuture = a->done.getFuture();
writeThread->post(a.release());
return openFuture;
Expand Down Expand Up @@ -2427,7 +2440,6 @@ struct RocksDBKeyValueStore : IKeyValueStore {
Reference<IThreadPool> readThreads;
std::shared_ptr<RocksDBErrorListener> errorListener;
std::shared_ptr<RocksDBEventListener> eventListener;
std::shared_ptr<double> lastFlushTime;
Future<Void> errorFuture;
Promise<Void> closePromise;
Future<Void> openFuture;
Expand Down
2 changes: 1 addition & 1 deletion fdbserver/KeyValueStoreShardedRocksDB.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3713,7 +3713,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
} else {
auto a = std::make_unique<Writer::OpenAction>(&shardManager, metrics, &readSemaphore, &fetchSemaphore);
openFuture = a->done.getFuture();
if (SERVER_KNOBS->ROCKSDB_METRICS_IN_SIMULATION) {
if (SERVER_KNOBS->ROCKSDB_ENABLE_NONDETERMINISM) {
this->metrics =
ShardManager::shardMetricsLogger(this->rState, openFuture, &shardManager) &&
rocksDBAggregatedMetricsLogger(this->rState, openFuture, rocksDBMetrics, &shardManager, this->path);
Expand Down
2 changes: 1 addition & 1 deletion tests/fast/RocksdbNondeterministicTest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
storageEngineType = 4 # Always pick RocksDB

[[knobs]]
rocksdb_metrics_in_simulation = true # This can cause non-determinism, but we don't do unseed check for this test
rocksdb_enable_nondeterminism = true # This can cause non-determinism, but we don't do unseed check for this test

[[test]]
testTitle = 'Clogged'
Expand Down
2 changes: 1 addition & 1 deletion tests/fast/ShardedRocksNondeterministicTest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ storageEngineType = 5 # Always pick ShardedRocks

# These can cause non-determinism, but we don't do unseed check for this test
[[knobs]]
rocksdb_metrics_in_simulation = true
rocksdb_enable_nondeterminism = true
shard_encode_location_metadata = true

[[test]]
Expand Down

0 comments on commit 56d5c17

Please sign in to comment.