Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion internal/aof/aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package aof

import (
"bufio"
"bytes"
"context"
"io"
"os"
"path"
"strconv"
"strings"

"RedisShake/internal/entry"
"RedisShake/internal/log"
"RedisShake/internal/rdb"
)

const (
Expand All @@ -24,12 +27,17 @@ const (
type Loader struct {
filePath string
ch chan *entry.Entry

name string
useRDBPreamble int
}

func NewLoader(filePath string, ch chan *entry.Entry) *Loader {
func NewLoader(name string, useRDBPreamble int, filePath string, ch chan *entry.Entry) *Loader {
ld := new(Loader)
ld.ch = ch
ld.filePath = filePath
ld.name = name
ld.useRDBPreamble = useRDBPreamble
return ld
}

Expand Down Expand Up @@ -78,7 +86,29 @@ func (ld *Loader) LoadSingleAppendOnlyFile(ctx context.Context, timestamp int64)
return Empty
}
}
isRDB := false
if ld.useRDBPreamble == 1 {
sig := make([]byte, 6)
n, err := fp.Read(sig)
if err != nil && err != io.EOF {
log.Infof("Reading signature the append only File %v: %v", path.Base(filePath), err)
return Failed
}
isRDB = (err == nil) && (n >= 5 && bytes.Equal(sig[:5], []byte("REDIS"))) || (n >= 6 && bytes.Equal(sig[:6], []byte("VALKEY")))

if _, err := fp.Seek(0, io.SeekStart); err != nil {
log.Infof("Unrecoverable error reading the append only File %v: %v", path.Base(filePath), err)
return Failed
}
}

reader := bufio.NewReader(fp)
if isRDB { //Skipped RDB checksum and has not been processed yet.
log.Infof("Reading RDB Base File on AOF loading...")
rdbLoader := rdb.NewLoader(ld.name, nil, filePath, ld.ch)
_ = rdbLoader.ParseRDBStream(ctx, reader)
log.Infof("[%s] RDB preamble parse done, switching to AOF stream...", ld.name)
}
for {
select {
case <-ctx.Done():
Expand Down
11 changes: 9 additions & 2 deletions internal/rdb/rdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ func NewLoader(name string, updateFunc func(int64), filPath string, ch chan *ent
ld.ch = ch
ld.filPath = filPath
ld.name = name
ld.updateFunc = updateFunc
if updateFunc != nil {
ld.updateFunc = updateFunc
}
return ld
}

Expand All @@ -85,9 +87,13 @@ func (ld *Loader) ParseRDB(ctx context.Context) int {
}
}()
rd := bufio.NewReader(ld.fp)
return ld.ParseRDBStream(ctx, rd)
}

func (ld *Loader) ParseRDBStream(ctx context.Context, rd *bufio.Reader) int {
// magic + version
buf := make([]byte, 9)
_, err = io.ReadFull(rd, buf)
_, err := io.ReadFull(rd, buf)
if err != nil {
log.Panicf(err.Error())
}
Expand Down Expand Up @@ -208,6 +214,7 @@ func (ld *Loader) parseRDBEntry(ctx context.Context, rd *bufio.Reader) {
case kFlagSelect:
ld.nowDBId = int(structure.ReadLength(rd))
case kEOF:
io.ReadFull(rd, make([]byte, 8)) // read and ignore checksum
return
default:
key := structure.ReadString(rd)
Expand Down
8 changes: 4 additions & 4 deletions internal/reader/aof_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,14 @@ func (r *aofReader) StartRead(ctx context.Context) []chan *entry.Entry {

// start read aof
go func() {
aofFileInfo := NewAOFFileInfo(r.path, r.ch)
aofFileInfo := NewAOFFileInfo(r.stat.AOFName, r.path, r.ch)
// try load manifest file
aofFileInfo.AOFLoadManifestFromDisk()
manifestInfo := aofFileInfo.AOFManifest
if manifestInfo == nil { // load single aof file
log.Infof("start send single AOF path=[%s]", r.path)
aofLoader := aof.NewLoader(r.path, r.ch)
aofFileInfo.AOFUseRDBPreamble = 1
aofLoader := aof.NewLoader(r.stat.AOFName, aofFileInfo.AOFUseRDBPreamble, r.path, r.ch)
ret := aofLoader.LoadSingleAppendOnlyFile(ctx, r.stat.AOFTimestamp)
if ret == AOFOk || ret == AOFTruncated {
log.Infof("The AOF File was successfully loaded")
Expand All @@ -88,8 +89,7 @@ func (r *aofReader) StartRead(ctx context.Context) []chan *entry.Entry {
log.Infof("Send single AOF finished. path=[%s]", r.path)
close(r.ch)
} else {
aofLoader := NewAOFFileInfo(r.path, r.ch)
ret := aofLoader.LoadAppendOnlyFile(ctx, manifestInfo, r.stat.AOFTimestamp)
ret := aofFileInfo.LoadAppendOnlyFile(ctx, manifestInfo, r.stat.AOFTimestamp)
if ret == AOFOk || ret == AOFTruncated {
log.Infof("The AOF File was successfully loaded")
} else {
Expand Down
8 changes: 5 additions & 3 deletions internal/reader/parsing_aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ func StringNeedsRepr(s string) int {
}

type INFO struct {
AOFName string
AOFDirName string
AOFUseRDBPreamble int // TODO:not support parsing rdb preamble
AOFUseRDBPreamble int
AOFManifest *AOFManifest
AOFFileName string
AOFCurrentSize int64
Expand All @@ -76,8 +77,9 @@ func (aofInfo *INFO) GetAOFDirName() string {
return aofInfo.AOFDirName
}

func NewAOFFileInfo(aofFilePath string, ch chan *entry.Entry) *INFO {
func NewAOFFileInfo(name, aofFilePath string, ch chan *entry.Entry) *INFO {
return &INFO{
AOFName: name,
AOFDirName: filepath.Dir(aofFilePath),
AOFUseRDBPreamble: 0,
AOFManifest: nil,
Expand Down Expand Up @@ -736,6 +738,6 @@ func (aofInfo *INFO) ParsingSingleAppendOnlyFile(ctx context.Context, FileName s
return AOFOk
}
// load single aof file
aofSingleReader := aof.NewLoader(MakePath(aofInfo.AOFDirName, FileName), aofInfo.ch)
aofSingleReader := aof.NewLoader(aofInfo.AOFName, aofInfo.AOFUseRDBPreamble, MakePath(aofInfo.AOFDirName, FileName), aofInfo.ch)
return aofSingleReader.LoadSingleAppendOnlyFile(ctx, AOFTimeStamp)
}
Loading