Skip to content

Commit

Permalink
Rocksdb manual flush code changes
Browse files Browse the repository at this point in the history
  • Loading branch information
neethuhaneesha committed Dec 23, 2024
1 parent 602df45 commit 984e475
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 5 deletions.
4 changes: 2 additions & 2 deletions fdbclient/ServerKnobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -573,8 +573,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_CF_METRICS_DELAY, 900.0 );
init( ROCKSDB_MAX_LOG_FILE_SIZE, 10485760 ); // 10MB.
init( ROCKSDB_KEEP_LOG_FILE_NUM, 100 ); // Keeps 1GB log per storage server.
// Does manual flushes at regular intervals(seconds), incase rocksdb did not flush. Feature disable if the value is 0.
init( ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL, 0 ); if( randomize && BUGGIFY ) ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL = deterministicRandom()->randomInt(4, 10);
// Does manual flushes at regular intervals(seconds), incase rocksdb did not flush. Feature disabled if the value is 0.
init( ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL, 600 ); if( isSimulated ) ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL = deterministicRandom()->randomInt(4, 1200);
init( ROCKSDB_SKIP_STATS_UPDATE_ON_OPEN, true );
init( ROCKSDB_SKIP_FILE_SIZE_CHECK_ON_OPEN, true );
init( ROCKSDB_FULLFILE_CHECKSUM, false ); if ( randomize && BUGGIFY ) ROCKSDB_FULLFILE_CHECKSUM = true;
Expand Down
17 changes: 14 additions & 3 deletions fdbserver/KeyValueStoreRocksDB.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,9 @@ class RocksDBEventListener : public rocksdb::EventListener {
void OnFlushCompleted(rocksdb::DB* db, const rocksdb::FlushJobInfo& info) override { *lastFlushTime = now(); }

private:
// lastFlushTime is used by two threads. One Thread is reading the value and the other thread is updating the value.
// If the reader thread gets a wrong value due to race, that will be still fine in this case(probably an extra flush
// or no flush). Considering the cost of atomic, avoided it here in this case.
std::shared_ptr<double> lastFlushTime;
};

Expand Down Expand Up @@ -992,12 +995,20 @@ ACTOR Future<Void> manualFlush(UID id,
CF cf) {
if (SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) {
state rocksdb::FlushOptions fOptions = sharedState->getFlushOptions();
state double waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL;
state double currTime = 0;
state int timeElapsedAfterLastFlush = 0;
loop {
wait(delay(SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL));
wait(delay(waitTime));

if ((now() - *lastFlushTime) > SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) {
currTime = now();
timeElapsedAfterLastFlush = currTime - *lastFlushTime;
if (timeElapsedAfterLastFlush >= SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) {
db->Flush(fOptions, cf);
TraceEvent e("RocksDBManualFlush", id);
waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL;
TraceEvent("RocksDBManualFlush", id).detail("TimeElapsedAfterLastFlush", timeElapsedAfterLastFlush);
} else {
waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL - timeElapsedAfterLastFlush;
}
}
}
Expand Down

0 comments on commit 984e475

Please sign in to comment.