-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding WAL Replay functionality #1413
base: master
Are you sure you want to change the base?
Conversation
@@ -40,6 +40,7 @@ type DiceDBConfig struct { | |||
WALRetentionMode string `mapstructure:"wal-retention-mode" default:"num-segments" description:"the new horizon for wal segment post cleanup. values: num-segments, time, checkpoint"` | |||
WALMaxSegmentCount int `mapstructure:"wal-max-segment-count" default:"10" description:"the maximum number of segments to retain, if the retention mode is 'num-segments'"` | |||
WALMaxSegmentRetentionDurationSec int `mapstructure:"wal-max-segment-retention-duration-sec" default:"600" description:"the maximum duration (in seconds) for wal segments retention"` | |||
WALRecovery bool `mapstructure:"wal-recovery" default:"true" description:"enable recovery from wal"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not have this option. By default if WAL is enabled, recovery should happen. We can get rid of this and reuse EnableWAL to put restoration behind the flag.
persistence.aof_file = "./dice-master.aof" | ||
persistence.persistence_enabled = true | ||
persistence.write_aof_on_cleanup = false | ||
persistence.wal-dir = "./" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not using this file. The new config system does not require this at all and is self-sufficient to run on default values and existing files without having to check-in or be present. We can safely delete this file.
@@ -60,7 +62,7 @@ func NewCommandHandler(id string, responseChan, preprocessingChan chan *ops.Stor | |||
cmdWatchSubscriptionChan chan watchmanager.WatchSubscription, | |||
parser requestparser.Parser, shardManager *shard.ShardManager, gec chan error, | |||
ioThreadReadChan chan []byte, ioThreadWriteChan chan interface{}, ioThreadErrChan chan error, | |||
wl wal.AbstractWAL) *BaseCommandHandler { | |||
wl wal.AbstractWAL, replay bool) *BaseCommandHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of passing this variable, we can use the config.Config.EnableWAL
to see if the WAL is enabled. If yes, we can reply and restore. I recommend let's remove this from here, even the argument and use the config right before the restoration.
@@ -214,7 +214,7 @@ func (s *Server) AcceptConnectionRequests(ctx context.Context, wg *sync.WaitGrou | |||
|
|||
handler := commandhandler.NewCommandHandler(cmdHandlerID, responseChan, preprocessingChan, | |||
s.cmdWatchSubscriptionChan, parser, s.shardManager, s.globalErrorChan, | |||
ioThreadReadChan, ioThreadWriteChan, ioThreadErrChan, s.wl) | |||
ioThreadReadChan, ioThreadWriteChan, ioThreadErrChan, s.wl, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this additional arg seems unnecessary after we make the changes I suggested above.
go func() { | ||
if err := replayHandler.Start(ctx); err != nil { | ||
slog.Error("WAL replay handler failed", slog.Any("error", err)) | ||
} | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this need to happen in a thread? Ideally, DiceDB should not start accepting and serving the requests until the entire WAL is replayed on boot.
) | ||
|
||
type AbstractWAL interface { | ||
LogCommand([]byte) error | ||
Close() error | ||
Init(t time.Time) error | ||
ForEachCommand(f func(c cmd.DiceDBCmd) error) error | ||
ReplayWAL(c func(*WALEntry) error) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function should be called Replay
and not ReplayWAL
given we are already under type AbstractWAL
@@ -305,7 +308,82 @@ func (wal *AOF) deleteSegmentPeriodically() { | |||
} | |||
} | |||
|
|||
func (wal *AOF) ForEachCommand(f func(c cmd.DiceDBCmd) error) error { | |||
// TODO: implement this method | |||
func (wal *AOF) getSegmentFiles() ([]string, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Go, we do not and should not prefix the functions with Get
or get
. For set it is okay because we are taking an action. But for get, it should directly be the thing we are accessing. For example, getSegmentFiles
should be just SegmentFiles
or segmentFiles
depending on the visibility of the function.
|
||
// Unmarshal entry | ||
var entry WALEntry | ||
MustUnmarshal(entryData, &entry) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if this fails? I see you invoked MustUnmarshal. What is our strategy here?
// Validate CRC | ||
expectedCRC := crc32.ChecksumIEEE(append(entry.Data, byte(entry.LogSequenceNumber))) | ||
if entry.Crc32 != expectedCRC { | ||
return fmt.Errorf("CRC mismatch for log sequence %d: expected %d, got %d", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refer to LOGGING practices and apply them in this entire patch.
// All rights reserved. Licensed under the BSD 3-Clause License. See LICENSE file in the project root for full license information. | ||
|
||
package wal | ||
|
||
import ( | ||
"database/sql" | ||
"fmt" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are not supporting SQLite as WAL engine, update the Config
struct to reflect the same. There in possible values there was a reference to sqlite
. Also, do run go mod tidy
to eradicate any stray dependency.
@@ -182,6 +171,34 @@ func Start() { | |||
go runServer(ctx, &serverWg, websocketServer, serverErrCh) | |||
} | |||
|
|||
// Recovery from WAL logs | |||
if config.Config.WALRecovery { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check will be changed as per above comment.
Adding minor refactoring to the WAL and implementing basic interface for WAL Replay by adding a separate new commandHandler for walReplay.
cc @arpitbbhayani @JyotinderSingh