From 08a49f14e67cb888957aa76a97b7c83bfbbdc39a Mon Sep 17 00:00:00 2001 From: neethuhaneesha Date: Thu, 19 Dec 2024 12:29:34 -0800 Subject: [PATCH] Rocksdb manual flush code changes --- fdbclient/ServerKnobs.cpp | 4 ++-- fdbserver/KeyValueStoreRocksDB.actor.cpp | 17 ++++++++++++++--- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 0ae109319fe..f3f8e5f96e9 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -595,8 +595,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( ROCKSDB_CF_METRICS_DELAY, 900.0 ); init( ROCKSDB_MAX_LOG_FILE_SIZE, 10485760 ); // 10MB. init( ROCKSDB_KEEP_LOG_FILE_NUM, 100 ); // Keeps 1GB log per storage server. - // Does manual flushes at regular intervals(seconds), incase rocksdb did not flush. Feature disable if the value is 0. - init( ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL, 0 ); if( randomize && BUGGIFY ) ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL = deterministicRandom()->randomInt(4, 10); + // Does manual flushes at regular intervals(seconds), incase rocksdb did not flush. Feature disabled if the value is 0. + init( ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL, 600 ); if( isSimulated ) ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL = deterministicRandom()->randomInt(4, 1200); init( ROCKSDB_SKIP_STATS_UPDATE_ON_OPEN, true ); init( ROCKSDB_SKIP_FILE_SIZE_CHECK_ON_OPEN, true ); init( ROCKSDB_FULLFILE_CHECKSUM, false ); if ( randomize && BUGGIFY ) ROCKSDB_FULLFILE_CHECKSUM = true; diff --git a/fdbserver/KeyValueStoreRocksDB.actor.cpp b/fdbserver/KeyValueStoreRocksDB.actor.cpp index df940a51f37..57e0573c4a7 100644 --- a/fdbserver/KeyValueStoreRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreRocksDB.actor.cpp @@ -379,6 +379,9 @@ class RocksDBEventListener : public rocksdb::EventListener { void OnFlushCompleted(rocksdb::DB* db, const rocksdb::FlushJobInfo& info) override { *lastFlushTime = now(); } private: + // lastFlushTime is used by two threads. One Thread is reading the value and the other thread is updating the value. + // If the reader thread gets a wrong value due to race, that will be still fine in this case(probably an extra flush + // or no flush). Considering the cost of atomic, avoided it here in this case. std::shared_ptr lastFlushTime; }; @@ -993,12 +996,20 @@ ACTOR Future manualFlush(UID id, CF cf) { if (SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) { state rocksdb::FlushOptions fOptions = sharedState->getFlushOptions(); + state double waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL; + state double currTime = 0; + state int timeElapsedAfterLastFlush = 0; loop { - wait(delay(SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL)); + wait(delay(waitTime)); - if ((now() - *lastFlushTime) > SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) { + currTime = now(); + timeElapsedAfterLastFlush = currTime - *lastFlushTime; + if (timeElapsedAfterLastFlush >= SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) { db->Flush(fOptions, cf); - TraceEvent e("RocksDBManualFlush", id); + waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL; + TraceEvent("RocksDBManualFlush", id).detail("TimeElapsedAfterLastFlush", timeElapsedAfterLastFlush); + } else { + waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL - timeElapsedAfterLastFlush; } } }