From 56d5c17c23a45d2361f0d639fd8f93f7fa51a9bb Mon Sep 17 00:00:00 2001 From: neethuhaneesha Date: Thu, 19 Dec 2024 12:29:34 -0800 Subject: [PATCH] Rocksdb manual flush code changes --- fdbclient/ServerKnobs.cpp | 6 +- fdbclient/include/fdbclient/ServerKnobs.h | 8 +-- fdbserver/KeyValueStoreRocksDB.actor.cpp | 56 +++++++++++-------- .../KeyValueStoreShardedRocksDB.actor.cpp | 2 +- tests/fast/RocksdbNondeterministicTest.toml | 2 +- .../ShardedRocksNondeterministicTest.toml | 2 +- 6 files changed, 44 insertions(+), 32 deletions(-) diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index d72eb240887..e59191dd37d 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -597,8 +597,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( ROCKSDB_CF_METRICS_DELAY, 900.0 ); init( ROCKSDB_MAX_LOG_FILE_SIZE, 10485760 ); // 10MB. init( ROCKSDB_KEEP_LOG_FILE_NUM, 100 ); // Keeps 1GB log per storage server. - // Does manual flushes at regular intervals(seconds), incase rocksdb did not flush. Feature disable if the value is 0. - init( ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL, 0 ); if( randomize && BUGGIFY ) ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL = deterministicRandom()->randomInt(4, 10); + // Does manual flushes at regular intervals(seconds), incase rocksdb did not flush. Feature disabled if the value is 0. + init( ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL, 600 ); if( isSimulated ) ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL = deterministicRandom()->randomInt(4, 1200); init( ROCKSDB_SKIP_STATS_UPDATE_ON_OPEN, true ); init( ROCKSDB_SKIP_FILE_SIZE_CHECK_ON_OPEN, true ); init( ROCKSDB_FULLFILE_CHECKSUM, false ); if ( randomize && BUGGIFY ) ROCKSDB_FULLFILE_CHECKSUM = true; @@ -609,7 +609,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( ROCKSDB_MEMTABLE_PROTECTION_BYTES_PER_KEY, 0 ); if ( randomize && BUGGIFY ) ROCKSDB_MEMTABLE_PROTECTION_BYTES_PER_KEY = 8; // Default: 0 (disabled). Supported values: 0, 1, 2, 4, 8. // Block cache key-value checksum. Checksum is validated during read, so has non-trivial impact on read performance. init( ROCKSDB_BLOCK_PROTECTION_BYTES_PER_KEY, 0 ); if ( randomize && BUGGIFY ) ROCKSDB_BLOCK_PROTECTION_BYTES_PER_KEY = 8; // Default: 0 (disabled). Supported values: 0, 1, 2, 4, 8. - init( ROCKSDB_METRICS_IN_SIMULATION, false ); + init( ROCKSDB_ENABLE_NONDETERMINISM, false ); init( SHARDED_ROCKSDB_ALLOW_WRITE_STALL_ON_FLUSH, false ); init( SHARDED_ROCKSDB_VALIDATE_MAPPING_RATIO, 0.01 ); if (isSimulated) SHARDED_ROCKSDB_VALIDATE_MAPPING_RATIO = deterministicRandom()->random01(); init( SHARD_METADATA_SCAN_BYTES_LIMIT, 10485760 ); // 10MB diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 32fb7b3cc3a..2ad98931af0 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -578,10 +578,10 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ServerKnobs : public KnobsImpldbOptions, this->cfOptions); } rocksdb::ReadOptions getReadOptions() { return this->readOptions; } rocksdb::FlushOptions getFlushOptions() { return this->flushOptions; } + const double getLastFlushTime() { return this->lastFlushTime_; } + void setLastFlushTime(double lastFlushTime) { this->lastFlushTime_ = lastFlushTime; } private: const UID id; @@ -120,6 +122,7 @@ class SharedRocksDBState { rocksdb::ColumnFamilyOptions cfOptions; rocksdb::ReadOptions readOptions; rocksdb::FlushOptions flushOptions; + std::atomic lastFlushTime_; }; SharedRocksDBState::SharedRocksDBState(UID id) @@ -374,12 +377,14 @@ class RocksDBErrorListener : public rocksdb::EventListener { class RocksDBEventListener : public rocksdb::EventListener { public: - RocksDBEventListener(std::shared_ptr lastFlushTime) : lastFlushTime(lastFlushTime){}; + RocksDBEventListener(std::shared_ptr sharedState) : sharedState(sharedState){}; - void OnFlushCompleted(rocksdb::DB* db, const rocksdb::FlushJobInfo& info) override { *lastFlushTime = now(); } + void OnFlushCompleted(rocksdb::DB* db, const rocksdb::FlushJobInfo& info) override { + sharedState->setLastFlushTime(now()); + } private: - std::shared_ptr lastFlushTime; + std::shared_ptr sharedState; }; using DB = rocksdb::DB*; @@ -986,19 +991,30 @@ ACTOR Future flowLockLogger(UID id, const FlowLock* readLock, const FlowLo } } -ACTOR Future manualFlush(UID id, - rocksdb::DB* db, - std::shared_ptr sharedState, - std::shared_ptr lastFlushTime, - CF cf) { +ACTOR Future manualFlush(UID id, rocksdb::DB* db, std::shared_ptr sharedState, CF cf) { if (SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) { state rocksdb::FlushOptions fOptions = sharedState->getFlushOptions(); + state double waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL; + state double currTime = 0; + state int timeElapsedAfterLastFlush = 0; loop { - wait(delay(SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL)); + wait(delay(waitTime)); - if ((now() - *lastFlushTime) > SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) { + currTime = now(); + timeElapsedAfterLastFlush = currTime - sharedState->getLastFlushTime(); + if (timeElapsedAfterLastFlush >= SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) { db->Flush(fOptions, cf); - TraceEvent e("RocksDBManualFlush", id); + waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL; + TraceEvent("RocksDBManualFlush", id).detail("TimeElapsedAfterLastFlush", timeElapsedAfterLastFlush); + } else { + waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL - timeElapsedAfterLastFlush; + } + + // The above code generates different waitTimes based on rocksdb background flushes which causes non + // deterministic behavior. Setting constant waitTimes in simulation to avoid this. And enable the behavior + // only in RocksdbNondeterministic(ROCKSDB_ENABLE_NONDETERMINISM=true) test. + if (g_network->isSimulated() && !SERVER_KNOBS->ROCKSDB_ENABLE_NONDETERMINISM) { + waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL; } } } @@ -1289,11 +1305,9 @@ struct RocksDBKeyValueStore : IKeyValueStore { const FlowLock* fetchLock, std::shared_ptr errorListener, std::shared_ptr eventListener, - std::shared_ptr lastFlushTime, Counters& counters) : path(std::move(path)), metrics(metrics), readLock(readLock), fetchLock(fetchLock), - errorListener(errorListener), eventListener(eventListener), lastFlushTime(lastFlushTime), - counters(counters) {} + errorListener(errorListener), eventListener(eventListener), counters(counters) {} double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; } }; @@ -1358,7 +1372,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { // The current thread and main thread are same when the code runs in simulation. // blockUntilReady() is getting the thread into deadlock state, so directly calling // the metricsLogger. - if (SERVER_KNOBS->ROCKSDB_METRICS_IN_SIMULATION) { + if (SERVER_KNOBS->ROCKSDB_ENABLE_NONDETERMINISM) { a.metrics = rocksDBMetricLogger(id, sharedState, options.statistics, @@ -1368,10 +1382,10 @@ struct RocksDBKeyValueStore : IKeyValueStore { &a.counters, cf) && flowLockLogger(id, a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool) && - manualFlush(id, db, sharedState, a.lastFlushTime, cf); + manualFlush(id, db, sharedState, cf); } else { a.metrics = flowLockLogger(id, a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool) && - manualFlush(id, db, sharedState, a.lastFlushTime, cf); + manualFlush(id, db, sharedState, cf); } } else { onMainThread([&] { @@ -1384,7 +1398,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { &a.counters, cf) && flowLockLogger(id, a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool) && - manualFlush(id, db, sharedState, a.lastFlushTime, cf); + manualFlush(id, db, sharedState, cf); return Future(true); }).blockUntilReady(); } @@ -1898,8 +1912,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { numReadWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX), numFetchWaiters(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX), errorListener(std::make_shared(id)), errorFuture(errorListener->getFuture()) { - lastFlushTime = std::make_shared(now()); - eventListener = std::make_shared(lastFlushTime); + eventListener = std::make_shared(sharedState); // In simluation, run the reader/writer threads as Coro threads (i.e. in the network thread. The storage engine // is still multi-threaded as background compaction threads are still present. Reads/writes to disk will also // block the network thread in a way that would be unacceptable in production but is a necessary evil here. When @@ -2093,7 +2106,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { return openFuture; } auto a = std::make_unique( - path, metrics, &readSemaphore, &fetchSemaphore, errorListener, eventListener, lastFlushTime, counters); + path, metrics, &readSemaphore, &fetchSemaphore, errorListener, eventListener, counters); openFuture = a->done.getFuture(); writeThread->post(a.release()); return openFuture; @@ -2427,7 +2440,6 @@ struct RocksDBKeyValueStore : IKeyValueStore { Reference readThreads; std::shared_ptr errorListener; std::shared_ptr eventListener; - std::shared_ptr lastFlushTime; Future errorFuture; Promise closePromise; Future openFuture; diff --git a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp index a542e5ed75c..5369fd24c3c 100644 --- a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp @@ -3713,7 +3713,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { } else { auto a = std::make_unique(&shardManager, metrics, &readSemaphore, &fetchSemaphore); openFuture = a->done.getFuture(); - if (SERVER_KNOBS->ROCKSDB_METRICS_IN_SIMULATION) { + if (SERVER_KNOBS->ROCKSDB_ENABLE_NONDETERMINISM) { this->metrics = ShardManager::shardMetricsLogger(this->rState, openFuture, &shardManager) && rocksDBAggregatedMetricsLogger(this->rState, openFuture, rocksDBMetrics, &shardManager, this->path); diff --git a/tests/fast/RocksdbNondeterministicTest.toml b/tests/fast/RocksdbNondeterministicTest.toml index b063da332f7..72694310529 100644 --- a/tests/fast/RocksdbNondeterministicTest.toml +++ b/tests/fast/RocksdbNondeterministicTest.toml @@ -2,7 +2,7 @@ storageEngineType = 4 # Always pick RocksDB [[knobs]] -rocksdb_metrics_in_simulation = true # This can cause non-determinism, but we don't do unseed check for this test +rocksdb_enable_nondeterminism = true # This can cause non-determinism, but we don't do unseed check for this test [[test]] testTitle = 'Clogged' diff --git a/tests/fast/ShardedRocksNondeterministicTest.toml b/tests/fast/ShardedRocksNondeterministicTest.toml index 81847c8dee3..91828c5e6ce 100644 --- a/tests/fast/ShardedRocksNondeterministicTest.toml +++ b/tests/fast/ShardedRocksNondeterministicTest.toml @@ -3,7 +3,7 @@ storageEngineType = 5 # Always pick ShardedRocks # These can cause non-determinism, but we don't do unseed check for this test [[knobs]] -rocksdb_metrics_in_simulation = true +rocksdb_enable_nondeterminism = true shard_encode_location_metadata = true [[test]]