diff --git a/internal/aof/aof.go b/internal/aof/aof.go index 42b9a807..6035c344 100644 --- a/internal/aof/aof.go +++ b/internal/aof/aof.go @@ -2,14 +2,17 @@ package aof import ( "bufio" + "bytes" "context" "io" "os" + "path" "strconv" "strings" "RedisShake/internal/entry" "RedisShake/internal/log" + "RedisShake/internal/rdb" ) const ( @@ -24,12 +27,17 @@ const ( type Loader struct { filePath string ch chan *entry.Entry + + name string + useRDBPreamble int } -func NewLoader(filePath string, ch chan *entry.Entry) *Loader { +func NewLoader(name string, useRDBPreamble int, filePath string, ch chan *entry.Entry) *Loader { ld := new(Loader) ld.ch = ch ld.filePath = filePath + ld.name = name + ld.useRDBPreamble = useRDBPreamble return ld } @@ -78,7 +86,29 @@ func (ld *Loader) LoadSingleAppendOnlyFile(ctx context.Context, timestamp int64) return Empty } } + isRDB := false + if ld.useRDBPreamble == 1 { + sig := make([]byte, 6) + n, err := fp.Read(sig) + if err != nil && err != io.EOF { + log.Infof("Reading signature the append only File %v: %v", path.Base(filePath), err) + return Failed + } + isRDB = (err == nil) && (n >= 5 && bytes.Equal(sig[:5], []byte("REDIS"))) || (n >= 6 && bytes.Equal(sig[:6], []byte("VALKEY"))) + + if _, err := fp.Seek(0, io.SeekStart); err != nil { + log.Infof("Unrecoverable error reading the append only File %v: %v", path.Base(filePath), err) + return Failed + } + } + reader := bufio.NewReader(fp) + if isRDB { //Skipped RDB checksum and has not been processed yet. + log.Infof("Reading RDB Base File on AOF loading...") + rdbLoader := rdb.NewLoader(ld.name, nil, filePath, ld.ch) + _ = rdbLoader.ParseRDBStream(ctx, reader) + log.Infof("[%s] RDB preamble parse done, switching to AOF stream...", ld.name) + } for { select { case <-ctx.Done(): diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index ae110fb3..3b4a2953 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -66,7 +66,9 @@ func NewLoader(name string, updateFunc func(int64), filPath string, ch chan *ent ld.ch = ch ld.filPath = filPath ld.name = name - ld.updateFunc = updateFunc + if updateFunc != nil { + ld.updateFunc = updateFunc + } return ld } @@ -85,9 +87,13 @@ func (ld *Loader) ParseRDB(ctx context.Context) int { } }() rd := bufio.NewReader(ld.fp) + return ld.ParseRDBStream(ctx, rd) +} + +func (ld *Loader) ParseRDBStream(ctx context.Context, rd *bufio.Reader) int { // magic + version buf := make([]byte, 9) - _, err = io.ReadFull(rd, buf) + _, err := io.ReadFull(rd, buf) if err != nil { log.Panicf(err.Error()) } @@ -208,6 +214,7 @@ func (ld *Loader) parseRDBEntry(ctx context.Context, rd *bufio.Reader) { case kFlagSelect: ld.nowDBId = int(structure.ReadLength(rd)) case kEOF: + io.ReadFull(rd, make([]byte, 8)) // read and ignore checksum return default: key := structure.ReadString(rd) diff --git a/internal/reader/aof_reader.go b/internal/reader/aof_reader.go index ea3a5fb6..56724e4b 100644 --- a/internal/reader/aof_reader.go +++ b/internal/reader/aof_reader.go @@ -72,13 +72,14 @@ func (r *aofReader) StartRead(ctx context.Context) []chan *entry.Entry { // start read aof go func() { - aofFileInfo := NewAOFFileInfo(r.path, r.ch) + aofFileInfo := NewAOFFileInfo(r.stat.AOFName, r.path, r.ch) // try load manifest file aofFileInfo.AOFLoadManifestFromDisk() manifestInfo := aofFileInfo.AOFManifest if manifestInfo == nil { // load single aof file log.Infof("start send single AOF path=[%s]", r.path) - aofLoader := aof.NewLoader(r.path, r.ch) + aofFileInfo.AOFUseRDBPreamble = 1 + aofLoader := aof.NewLoader(r.stat.AOFName, aofFileInfo.AOFUseRDBPreamble, r.path, r.ch) ret := aofLoader.LoadSingleAppendOnlyFile(ctx, r.stat.AOFTimestamp) if ret == AOFOk || ret == AOFTruncated { log.Infof("The AOF File was successfully loaded") @@ -88,8 +89,7 @@ func (r *aofReader) StartRead(ctx context.Context) []chan *entry.Entry { log.Infof("Send single AOF finished. path=[%s]", r.path) close(r.ch) } else { - aofLoader := NewAOFFileInfo(r.path, r.ch) - ret := aofLoader.LoadAppendOnlyFile(ctx, manifestInfo, r.stat.AOFTimestamp) + ret := aofFileInfo.LoadAppendOnlyFile(ctx, manifestInfo, r.stat.AOFTimestamp) if ret == AOFOk || ret == AOFTruncated { log.Infof("The AOF File was successfully loaded") } else { diff --git a/internal/reader/parsing_aof.go b/internal/reader/parsing_aof.go index 22889684..ccb5abee 100644 --- a/internal/reader/parsing_aof.go +++ b/internal/reader/parsing_aof.go @@ -62,8 +62,9 @@ func StringNeedsRepr(s string) int { } type INFO struct { + AOFName string AOFDirName string - AOFUseRDBPreamble int // TODO:not support parsing rdb preamble + AOFUseRDBPreamble int AOFManifest *AOFManifest AOFFileName string AOFCurrentSize int64 @@ -76,8 +77,9 @@ func (aofInfo *INFO) GetAOFDirName() string { return aofInfo.AOFDirName } -func NewAOFFileInfo(aofFilePath string, ch chan *entry.Entry) *INFO { +func NewAOFFileInfo(name, aofFilePath string, ch chan *entry.Entry) *INFO { return &INFO{ + AOFName: name, AOFDirName: filepath.Dir(aofFilePath), AOFUseRDBPreamble: 0, AOFManifest: nil, @@ -736,6 +738,6 @@ func (aofInfo *INFO) ParsingSingleAppendOnlyFile(ctx context.Context, FileName s return AOFOk } // load single aof file - aofSingleReader := aof.NewLoader(MakePath(aofInfo.AOFDirName, FileName), aofInfo.ch) + aofSingleReader := aof.NewLoader(aofInfo.AOFName, aofInfo.AOFUseRDBPreamble, MakePath(aofInfo.AOFDirName, FileName), aofInfo.ch) return aofSingleReader.LoadSingleAppendOnlyFile(ctx, AOFTimeStamp) }