From dbf27ffa053abeb217b48c08433e57ab73cd441b Mon Sep 17 00:00:00 2001 From: Miroslav Crnic Date: Tue, 24 Mar 2026 10:59:10 +0000 Subject: [PATCH 1/5] log: add WARN log level between INFO and ERROR --- cpp/core/CommonOptions.hpp | 2 ++ cpp/core/Env.cpp | 3 +++ cpp/core/Env.hpp | 12 +++++++++++- go/core/log/log.go | 12 +++++++++++- 4 files changed, 27 insertions(+), 2 deletions(-) diff --git a/cpp/core/CommonOptions.hpp b/cpp/core/CommonOptions.hpp index 8911fb81..7f7ebe76 100644 --- a/cpp/core/CommonOptions.hpp +++ b/cpp/core/CommonOptions.hpp @@ -134,6 +134,8 @@ static inline bool parseLogOptions(CommandLineArgs& args, LogOptions& options) { options.logLevel = LogLevel::LOG_DEBUG; } else if (logLevel == "info") { options.logLevel = LogLevel::LOG_INFO; + } else if (logLevel == "warn") { + options.logLevel = LogLevel::LOG_WARN; } else if (logLevel == "error") { options.logLevel = LogLevel::LOG_ERROR; } else { diff --git a/cpp/core/Env.cpp b/cpp/core/Env.cpp index ce4fe709..5c61170c 100644 --- a/cpp/core/Env.cpp +++ b/cpp/core/Env.cpp @@ -20,6 +20,9 @@ std::ostream& operator<<(std::ostream& out, LogLevel ll) { case LogLevel::LOG_INFO: out << "INFO"; break; + case LogLevel::LOG_WARN: + out << "WARN"; + break; case LogLevel::LOG_ERROR: out << "ERROR"; break; diff --git a/cpp/core/Env.hpp b/cpp/core/Env.hpp index 342fb493..ad6b2a25 100644 --- a/cpp/core/Env.hpp +++ b/cpp/core/Env.hpp @@ -20,7 +20,8 @@ enum class LogLevel : uint32_t { LOG_TRACE = 0, LOG_DEBUG = 1, LOG_INFO = 2, - LOG_ERROR = 3, + LOG_WARN = 3, + LOG_ERROR = 4, }; std::ostream& operator<<(std::ostream& out, LogLevel ll); @@ -65,6 +66,8 @@ struct Logger { syslogLevel = 7; break; case LogLevel::LOG_INFO: syslogLevel = 6; break; + case LogLevel::LOG_WARN: + syslogLevel = 4; break; case LogLevel::LOG_ERROR: syslogLevel = 3; break; default: @@ -181,6 +184,13 @@ struct Env { } \ } while (false) +#define LOG_WARN(env, ...) \ + do { \ + if (likely((env).shouldLog(LogLevel::LOG_WARN))) { \ + (env)._log(LogLevel::LOG_WARN, VALIDATE_FORMAT(__VA_ARGS__)); \ + } \ + } while (false) + #define LOG_ERROR(env, ...) \ do { \ if (likely((env).shouldLog(LogLevel::LOG_ERROR))) { \ diff --git a/go/core/log/log.go b/go/core/log/log.go index 8635506a..d2de79b8 100644 --- a/go/core/log/log.go +++ b/go/core/log/log.go @@ -23,7 +23,8 @@ type LogLevel uint8 const TRACE LogLevel = 0 const DEBUG LogLevel = 1 const INFO LogLevel = 2 -const ERROR LogLevel = 3 +const WARN LogLevel = 3 +const ERROR LogLevel = 4 func (ll LogLevel) String() string { switch ll { @@ -33,6 +34,8 @@ func (ll LogLevel) String() string { return "DEBUG" case INFO: return "INFO" + case WARN: + return "WARN" case ERROR: return "ERROR" default: @@ -92,6 +95,9 @@ func (log *Logger) formatLog(level LogLevel, time time.Time, file string, line i case ERROR: levelColor = red syslogPrio = syslogError + case WARN: + levelColor = yellow + syslogPrio = syslogWarn case INFO: levelColor = blue syslogPrio = syslogInfo @@ -244,6 +250,10 @@ func (l *Logger) InfoStack(calldepth int, format string, v ...any) { l.LogStack(1+calldepth, INFO, format, v...) } +func (l *Logger) Warn(format string, v ...any) { + l.LogStack(1, WARN, format, v...) +} + // There should be very few times where you want an error log but not an alert. func (l *Logger) ErrorNoAlert(format string, v ...any) { l.LogStack(1, ERROR, format, v...) From 106a3c5795f2e5ea364f58e8a52acd342fcb31fb Mon Sep 17 00:00:00 2001 From: Miroslav Crnic Date: Tue, 24 Mar 2026 11:00:27 +0000 Subject: [PATCH 2/5] certificate: use stack-allocated buffers and add BlockWriteProof --- go/core/certificate/blockscert.go | 51 ++++++++++++++++--------------- go/terncli/terncli.go | 2 +- 2 files changed, 28 insertions(+), 25 deletions(-) diff --git a/go/core/certificate/blockscert.go b/go/core/certificate/blockscert.go index 161a1561..c8bba801 100644 --- a/go/core/certificate/blockscert.go +++ b/go/core/certificate/blockscert.go @@ -5,36 +5,41 @@ package certificate import ( - "bytes" "crypto/cipher" "encoding/binary" "xtx/ternfs/core/cbcmac" "xtx/ternfs/msgs" ) -func BlockWriteCertificate(cipher cipher.Block, blockServiceId msgs.BlockServiceId, req *msgs.WriteBlockReq) [8]byte { - w := bytes.NewBuffer([]byte{}) - binary.Write(w, binary.LittleEndian, uint64(blockServiceId)) - w.Write([]byte{'w'}) - binary.Write(w, binary.LittleEndian, uint64(req.BlockId)) - binary.Write(w, binary.LittleEndian, uint32(req.Crc)) - binary.Write(w, binary.LittleEndian, uint32(req.Size)) - return cbcmac.CBCMAC(cipher, w.Bytes()) +func BlockWriteCertificate(cipher cipher.Block, blockServiceId msgs.BlockServiceId, blockId msgs.BlockId, crc msgs.Crc, size uint32) [8]byte { + var buf [25]byte + binary.LittleEndian.PutUint64(buf[0:8], uint64(blockServiceId)) + buf[8] = 'w' + binary.LittleEndian.PutUint64(buf[9:17], uint64(blockId)) + binary.LittleEndian.PutUint32(buf[17:21], uint32(crc)) + binary.LittleEndian.PutUint32(buf[21:25], size) + return cbcmac.CBCMAC(cipher, buf[:]) } func CheckBlockWriteCertificate(cipher cipher.Block, blockServiceId msgs.BlockServiceId, req *msgs.WriteBlockReq) ([8]byte, bool) { - expectedMac := BlockWriteCertificate(cipher, blockServiceId, req) + expectedMac := BlockWriteCertificate(cipher, blockServiceId, req.BlockId, req.Crc, req.Size) return expectedMac, expectedMac == req.Certificate } -func BlockEraseCertificate(blockServiceId msgs.BlockServiceId, blockId msgs.BlockId, key cipher.Block) [8]byte { - buf := bytes.NewBuffer([]byte{}) - // struct.pack_into(' Date: Tue, 24 Mar 2026 12:44:24 +0000 Subject: [PATCH 3/5] msgs: remove with_crc subdirectory from BlockId.Path() All blocks are now stored with page CRCs. The with_crc subdirectory level is no longer needed. The blockservice package will handle migrating existing on-disk data on startup. --- go/msgs/msgs.go | 2 +- go/ternblocks/ternblocks.go | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/go/msgs/msgs.go b/go/msgs/msgs.go index aece1d9e..5a541ab5 100644 --- a/go/msgs/msgs.go +++ b/go/msgs/msgs.go @@ -444,7 +444,7 @@ func (id BlockId) Path() string { h := sha1.New() h.Write([]byte(hex)) dir := fmt.Sprintf("%02x", h.Sum(nil)[0]) - return path.Join("with_crc", dir, hex) + return path.Join(dir, hex) } func MakeInodeId(typ InodeType, shard ShardId, id uint64) InodeId { diff --git a/go/ternblocks/ternblocks.go b/go/ternblocks/ternblocks.go index d4bfceff..effcb3ef 100644 --- a/go/ternblocks/ternblocks.go +++ b/go/ternblocks/ternblocks.go @@ -217,7 +217,7 @@ func updateBlockServiceInfoBlocks( ) error { t := time.Now() log.Info("starting to count blocks for %v", blockService.cachedInfo.Id) - blocksWithCrc, err := countBlocks(path.Join(blockService.path, "with_crc")) + blocksWithCrc, err := countBlocks(blockService.path) if err != nil { return err } @@ -1160,9 +1160,6 @@ func retrieveOrCreateKey(log *log.Logger, dir string) ([16]byte, error) { panic(err) } log.Info("creating directory structure") - if err := os.Mkdir(path.Join(dir, "with_crc"), 0755); err != nil && !os.IsExist(err) { - return [16]byte{}, fmt.Errorf("failed to create folder %s error: %v", path.Join(dir, "with_crc"), err) - } } else if read != 16 { return [16]byte{}, fmt.Errorf("short secret key (%v rather than 16 bytes)", read) } else { From 0843c1c05c562fa15d6e2bd908730ac8ce4f1ed1 Mon Sep 17 00:00:00 2001 From: Miroslav Crnic Date: Tue, 24 Mar 2026 12:49:56 +0000 Subject: [PATCH 4/5] blocks: add blockservice package with BlockService, CRC streams, and zero-copy fetch Split block storage operations into a reusable blockservice package: - BlockService: lifecycle management, write/erase/check/fetch operations - GetBlockFileForFetch: returns raw *os.File for zero-copy sendfile transfers - GetBlockReader: CRC-validating reader with optional CRC stripping - pageCrcReader/pageCrcWriter: page-level CRC32C validation streams - CGo-based fast block counting via getdents syscall - One-time with_crc directory migration on startup - Add blockservice unit tests --- go/ternblocks/blockservice/block.go | 103 +++ go/ternblocks/blockservice/blockservice.go | 692 ++++++++++++++++++ .../blockservice/blockservice_test.go | 371 ++++++++++ go/ternblocks/blockservice/countblocks.go | 90 +++ go/ternblocks/blockservice/crcstream.go | 165 +++++ go/ternblocks/blockservice/crcstream_test.go | 220 ++++++ go/ternblocks/blockservice/utils.go | 36 + 7 files changed, 1677 insertions(+) create mode 100644 go/ternblocks/blockservice/block.go create mode 100644 go/ternblocks/blockservice/blockservice.go create mode 100644 go/ternblocks/blockservice/blockservice_test.go create mode 100644 go/ternblocks/blockservice/countblocks.go create mode 100644 go/ternblocks/blockservice/crcstream.go create mode 100644 go/ternblocks/blockservice/crcstream_test.go create mode 100644 go/ternblocks/blockservice/utils.go diff --git a/go/ternblocks/blockservice/block.go b/go/ternblocks/blockservice/block.go new file mode 100644 index 00000000..eaaeaf3c --- /dev/null +++ b/go/ternblocks/blockservice/block.go @@ -0,0 +1,103 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: GPL-2.0-or-later + +package blockservice + +import ( + "errors" + "fmt" + "io" + "os" + "strings" + "sync/atomic" + "syscall" + "xtx/ternfs/msgs" + + "golang.org/x/sys/unix" +) + +type blockReader struct { + *pageCrcReader + bs *BlockService +} + +func newBlockReader(bs *BlockService, path string, offset uint32, count uint32, readAhead bool, stripCrc bool) (*blockReader, error) { + bs.logger.Debug("fetching block at path %v", path) + f, err := os.Open(path) + defer func() { + if f != nil { + f.Close() + } + }() + + if os.IsNotExist(err) { + return nil, msgs.BLOCK_NOT_FOUND + } + + if errors.Is(err, syscall.ENODATA) { + atomic.AddUint64(&bs.IoAttempts, 1) + atomic.AddUint64(&bs.IoErrors, 1) + bs.logger.RaiseHardwareEvent(strings.Split(bs.Path, ":")[0], bs.Id.String(), + fmt.Sprintf("could not open block %v, got ENODATA, this probably means that the block/disk is gone", path)) + return nil, syscall.EIO + } + + if err != nil { + return nil, err + } + + if offset%msgs.TERN_PAGE_SIZE != 0 { + bs.logger.Warn("trying to read from offset other than page boundary for path %v", path) + return nil, msgs.BLOCK_FETCH_OUT_OF_BOUNDS + } + if count%msgs.TERN_PAGE_SIZE != 0 { + bs.logger.Warn("trying to read count not multiple of page size for path %v", path) + return nil, msgs.BLOCK_FETCH_OUT_OF_BOUNDS + } + + fileOffset := int64(offset / msgs.TERN_PAGE_SIZE * msgs.TERN_PAGE_WITH_CRC_SIZE) + var pos int64 + if pos, err = f.Seek(fileOffset, 0); err != nil { + return nil, err + } + if pos != fileOffset { + return nil, msgs.BLOCK_FETCH_OUT_OF_BOUNDS + } + adviseCount := int64(count / msgs.TERN_PAGE_SIZE * msgs.TERN_PAGE_WITH_CRC_SIZE) + if readAhead { + var stat unix.Stat_t + if err := unix.Fstat(int(f.Fd()), &stat); err == nil { + adviseCount = stat.Size - fileOffset + } + unix.Fadvise(int(f.Fd()), fileOffset, adviseCount, unix.FADV_SEQUENTIAL|unix.FADV_WILLNEED) + } + + bs.wg.Add(1) + if bs.readC != nil { + bs.readC <- struct{}{} + } + reader := NewPageCrcReader(f, stripCrc) + f = nil + return &blockReader{ + pageCrcReader: reader, + bs: bs, + }, nil +} + +func (r *blockReader) Read(p []byte) (int, error) { + atomic.AddUint64(&r.bs.IoAttempts, 1) + read, err := r.pageCrcReader.Read(p) + if err != nil && err != io.EOF { + atomic.AddUint64(&r.bs.IoErrors, 1) + } + return read, err +} + +func (r *blockReader) Close() error { + if r.bs.readC != nil { + <-r.bs.readC + } + r.bs.wg.Done() + return r.pageCrcReader.Close() +} diff --git a/go/ternblocks/blockservice/blockservice.go b/go/ternblocks/blockservice/blockservice.go new file mode 100644 index 00000000..1637ab9d --- /dev/null +++ b/go/ternblocks/blockservice/blockservice.go @@ -0,0 +1,692 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: GPL-2.0-or-later + +package blockservice + +import ( + "crypto/aes" + "crypto/cipher" + crand "crypto/rand" + "encoding/binary" + "errors" + "fmt" + "io" + "os" + "path" + "path/filepath" + "strings" + "sync" + "sync/atomic" + "syscall" + "time" + "xtx/ternfs/core/bufpool" + "xtx/ternfs/core/certificate" + "xtx/ternfs/core/crc32c" + "xtx/ternfs/core/log" + "xtx/ternfs/msgs" + + "golang.org/x/sys/unix" +) + +type BlockServiceStats struct { + BlocksWritten uint64 + BytesWritten uint64 + BlocksErased uint64 + BlocksFetched uint64 + BytesFetched uint64 + BlocksChecked uint64 + BytesChecked uint64 + BlocksNotFound uint64 + IoErrors uint64 + IoAttempts uint64 +} + +type BlockServiceOptions struct { + BufferSize int + MaxConcurrentWrites int + MaxConcurrentReads int + FutureCutoff time.Duration + PastCutoff time.Duration + ReservedCapacityBytes uint64 +} + +type BlockService struct { + msgs.RegisterBlockServiceInfo + BlockServiceStats + DevId string + + options *BlockServiceOptions + + logger *log.Logger + bufPool *bufpool.BufPool + localPath string + + secretFile *os.File + cipher cipher.Block + + toDecommission atomic.Bool + active atomic.Bool + + stopC chan struct{} + eraseCheckC chan struct{} + writeC chan struct{} + readC chan struct{} + wg sync.WaitGroup +} + +const SECRET_FILE_NAME = "secret.key" +const INFO_REFRESH_PERIOD = time.Minute * 5 +const MAX_OBJECT_SIZE uint32 = 100 << 20 // 100MB + +func OpenBlockService(logger *log.Logger, blockServiceOptions *BlockServiceOptions, bufPool *bufpool.BufPool, blockServiceInfo msgs.RegisterBlockServiceInfo, devId string) *BlockService { + localPath := blockServiceInfo.Path + if pathSegments := strings.Split(blockServiceInfo.Path, ":"); len(pathSegments) > 1 { + localPath = pathSegments[1] + } + + var writeC chan struct{} + if blockServiceOptions.MaxConcurrentWrites > 0 { + writeC = make(chan struct{}, blockServiceOptions.MaxConcurrentWrites) + } + + var readC chan struct{} + if blockServiceOptions.MaxConcurrentReads > 0 { + readC = make(chan struct{}, blockServiceOptions.MaxConcurrentReads) + } + + blockService := &BlockService{ + RegisterBlockServiceInfo: blockServiceInfo, + DevId: devId, + options: blockServiceOptions, + logger: logger, + bufPool: bufPool, + localPath: localPath, + stopC: make(chan struct{}), + eraseCheckC: make(chan struct{}, 1), + writeC: writeC, + readC: readC, + } + + cipherBlock, err := aes.NewCipher(blockService.SecretKey[:]) + if err != nil { + logger.Warn("failed creating cipher for block service %v, err: %v", blockService.Id, err) + } else { + blockService.cipher = cipherBlock + } + + migrateWithCrcDir(logger, localPath) + blockService.startInfoUpdater() + return blockService +} + +func (b *BlockService) Close() { + close(b.stopC) + b.wg.Wait() + if b.secretFile != nil { + if err := syscall.Flock(int(b.secretFile.Fd()), syscall.LOCK_UN); err != nil { + b.logger.Warn("failed unlocking secret file for block service %v, err: %v", b.Id, err) + } + if err := b.secretFile.Close(); err != nil { + b.logger.Warn("failed closing secret file for block service %v, err: %v", b.Id, err) + } + b.secretFile = nil + } +} + +func (b *BlockService) ToDecommission() bool { + return b.toDecommission.Load() +} + +func (b *BlockService) Active() bool { + return b.active.Load() +} + +func (b *BlockService) Cipher() cipher.Block { + return b.cipher +} + +func (b *BlockService) LocalPath() string { + return b.localPath +} + +// GetBlockReader returns a reader that validates page CRCs and optionally strips them. +// Used for FetchBlock (strip CRC) and CheckBlock (verify CRC). +func (b *BlockService) GetBlockReader(blockId msgs.BlockId, offset uint32, count uint32, readAhead bool, stripCrc bool) (*blockReader, error) { + if err := b.acquireWgIfActive(); err != nil { + return nil, err + } + defer b.wg.Done() + atomic.AddUint64(&b.BlocksFetched, 1) + atomic.AddUint64(&b.BytesFetched, uint64(count)) + return newBlockReader(b, path.Join(b.localPath, blockId.Path()), offset, count, readAhead, stripCrc) +} + +// GetBlockFileForFetch returns a raw file handle for zero-copy transfer via conn.ReadFrom. +// The file is seeked to the correct offset. Caller must close the file and call +// ReleaseBlockFile when done. +// Returns the file, the number of bytes to read, and any error. +func (b *BlockService) GetBlockFileForFetch(blockId msgs.BlockId, offset uint32, count uint32, readAhead bool) (*os.File, int64, error) { + if err := b.acquireWgIfActive(); err != nil { + return nil, 0, err + } + defer b.wg.Done() + + if offset%msgs.TERN_PAGE_SIZE != 0 { + b.logger.RaiseAlert("trying to read from offset other than page boundary") + return nil, 0, msgs.BLOCK_FETCH_OUT_OF_BOUNDS + } + if count%msgs.TERN_PAGE_SIZE != 0 { + b.logger.RaiseAlert("trying to read count which is not a multiple of page size") + return nil, 0, msgs.BLOCK_FETCH_OUT_OF_BOUNDS + } + + pageCount := count / msgs.TERN_PAGE_SIZE + offsetPageCount := offset / msgs.TERN_PAGE_SIZE + blockPath := path.Join(b.localPath, blockId.Path()) + b.logger.Debug("fetching block id %v (%v -> %v) at path %v", blockId, offset, count, blockPath) + + f, err := os.Open(blockPath) + if errors.Is(err, syscall.ENODATA) { + b.logger.RaiseHardwareEvent(strings.Split(b.Path, ":")[0], b.Id.String(), + fmt.Sprintf("could not open block %v, got ENODATA, this probably means that the block/disk is gone", blockPath)) + return nil, 0, syscall.EIO + } + if os.IsNotExist(err) { + b.logger.ErrorNoAlert("could not find block to fetch at path %v", blockPath) + return nil, 0, msgs.BLOCK_NOT_FOUND + } + if err != nil { + return nil, 0, err + } + + fi, err := f.Stat() + if err != nil { + f.Close() + return nil, 0, err + } + filePageCount := uint32(fi.Size()) / msgs.TERN_PAGE_WITH_CRC_SIZE + if offsetPageCount+pageCount > filePageCount { + f.Close() + b.logger.RaiseAlert("malformed request for block %v. requested read at [%d - %d] but stored block size is %d", + blockId, offset, offset+count, filePageCount*msgs.TERN_PAGE_SIZE) + return nil, 0, msgs.BLOCK_FETCH_OUT_OF_BOUNDS + } + + fileOffset := int64(offsetPageCount * msgs.TERN_PAGE_WITH_CRC_SIZE) + byteCount := int64(pageCount * msgs.TERN_PAGE_WITH_CRC_SIZE) + + if readAhead { + unix.Fadvise(int(f.Fd()), fileOffset, fi.Size(), unix.FADV_SEQUENTIAL|unix.FADV_WILLNEED) + } + + if _, err := f.Seek(fileOffset, 0); err != nil { + f.Close() + return nil, 0, err + } + + atomic.AddUint64(&b.BlocksFetched, 1) + atomic.AddUint64(&b.BytesFetched, uint64(count)) + + if b.readC != nil { + b.readC <- struct{}{} + } + b.wg.Add(1) + return f, byteCount, nil +} + +// ReleaseBlockFile releases concurrency control acquired by GetBlockFileForFetch. +func (b *BlockService) ReleaseBlockFile() { + if b.readC != nil { + <-b.readC + } + b.wg.Done() +} + +func (b *BlockService) TestWrite(size uint32, dataReader io.Reader) error { + if err := b.acquireWgIfActive(); err != nil { + return err + } + defer b.wg.Done() + + if size > MAX_OBJECT_SIZE { + return msgs.BLOCK_TOO_BIG + } + + if b.writeC != nil { + b.writeC <- struct{}{} + defer func() { <-b.writeC }() + } + + atomic.AddUint64(&b.IoAttempts, 1) + f, err := os.CreateTemp(b.localPath, "tmp.") + if err != nil { + atomic.AddUint64(&b.IoErrors, 1) + return err + } + tmpName := f.Name() + defer func() { + f.Close() + os.Remove(tmpName) + }() + + w := NewPageCrcWriter(f) + atomic.AddUint64(&b.IoAttempts, 1) + buf := b.bufPool.Get(b.options.BufferSize) + defer b.bufPool.Put(buf) + written, err := io.CopyBuffer(w, io.LimitReader(dataReader, int64(size)), buf.Bytes()) + atomic.AddUint64(&b.BytesWritten, uint64(written)) + if err != nil { + atomic.AddUint64(&b.IoErrors, 1) + return err + } + if err = f.Sync(); err != nil { + atomic.AddUint64(&b.IoErrors, 1) + return err + } + return nil +} + +func (b *BlockService) WriteBlock(blockId msgs.BlockId, cert [8]byte, expectedCrc msgs.Crc, size uint32, dataReader io.Reader) ([8]byte, error) { + var proof [8]byte + if err := b.acquireWgIfActive(); err != nil { + return proof, err + } + defer b.wg.Done() + filePath := path.Join(b.localPath, blockId.Path()) + b.logger.Debug("writing block %v at path %v", blockId, filePath) + + if size > MAX_OBJECT_SIZE { + return proof, msgs.BLOCK_TOO_BIG + } + + expectedCert := certificate.BlockWriteCertificate(b.cipher, b.Id, blockId, expectedCrc, size) + if cert != expectedCert { + b.logger.Warn("bad certificate for block %v: %v != %v", blockId, cert, expectedCert) + return proof, msgs.BAD_CERTIFICATE + } + + now := time.Now() + pastCutoff := now.Add(-b.options.PastCutoff) + futureCutoff := now.Add(b.options.FutureCutoff) + blockTime := msgs.TernTime(uint64(blockId)).Time() + + if blockTime.Before(pastCutoff) { + b.logger.Info("block %v too old for write: %v < %v", blockId, blockTime, pastCutoff) + return proof, msgs.BLOCK_TOO_OLD_FOR_WRITE + } + + if blockTime.After(futureCutoff) { + panic(fmt.Errorf("block %v is in the future! (now=%v, futureCutoff=%v)", blockId, now, futureCutoff)) + } + + if b.writeC != nil { + b.writeC <- struct{}{} + defer func() { <-b.writeC }() + } + + atomic.AddUint64(&b.IoAttempts, 1) + if err := os.Mkdir(path.Dir(filePath), 0777); err != nil && !os.IsExist(err) { + atomic.AddUint64(&b.IoErrors, 1) + return proof, err + } + atomic.AddUint64(&b.IoAttempts, 1) + f, err := os.CreateTemp(b.localPath, "tmp.") + if err != nil { + atomic.AddUint64(&b.IoErrors, 1) + return proof, err + } + tmpName := f.Name() + defer func() { + if f != nil { + f.Close() + } + if err != nil { + os.Remove(tmpName) + } + }() + + w := NewPageCrcWriter(f) + atomic.AddUint64(&b.IoAttempts, 1) + written, err := io.CopyN(w, dataReader, int64(size)) + atomic.AddUint64(&b.BytesWritten, uint64(written)) + if err != nil { + atomic.AddUint64(&b.IoErrors, 1) + return proof, err + } + actualCrc, err := w.GetCrc() + if err != nil { + return proof, err + } + if msgs.Crc(actualCrc) != expectedCrc { + err = msgs.BAD_BLOCK_CRC + return proof, err + } + atomic.AddUint64(&b.IoAttempts, 1) + if err = f.Sync(); err != nil { + atomic.AddUint64(&b.IoErrors, 1) + return proof, err + } + + if err = f.Close(); err != nil { + return proof, err + } + f = nil + + // Check again after write — file transfer may have taken a while + now = time.Now() + pastCutoff = now.Add(-b.options.PastCutoff) + if blockTime.Before(pastCutoff) { + b.logger.Info("block %v too old for write: %v < %v", blockId, blockTime, pastCutoff) + return proof, msgs.BLOCK_TOO_OLD_FOR_WRITE + } + + atomic.AddUint64(&b.IoAttempts, 1) + err = moveFileAndSyncDir(tmpName, filePath) + if err != nil { + atomic.AddUint64(&b.IoErrors, 1) + return proof, err + } + + atomic.AddUint64(&b.BlocksWritten, 1) + proof = certificate.BlockWriteProof(b.cipher, b.Id, blockId) + return proof, nil +} + +func (b *BlockService) CheckBlock(blockId msgs.BlockId, expectedSize uint32, crc msgs.Crc) error { + if err := b.acquireWgIfActive(); err != nil { + return err + } + defer b.wg.Done() + + b.eraseCheckC <- struct{}{} + defer func() { <-b.eraseCheckC }() + + blockPath := path.Join(b.localPath, blockId.Path()) + b.logger.Debug("checking block %v at path %v", blockId, blockPath) + + atomic.AddUint64(&b.BlocksChecked, 1) + atomic.AddUint64(&b.BytesChecked, uint64(expectedSize)) + + f, err := newBlockReader(b, blockPath, 0, expectedSize, true, true) + if err != nil { + return err + } + defer func() { + if f != nil { + f.Close() + } + }() + + buf := b.bufPool.Get(b.options.BufferSize) + defer b.bufPool.Put(buf) + written, err := io.CopyBuffer(io.Discard, io.LimitReader(f, int64(expectedSize)), buf.Bytes()) + if err != nil && err != io.EOF { + return err + } + + if written != int64(expectedSize) { + return msgs.BAD_BLOCK_CRC + } + + if err = f.Close(); err != nil { + f = nil + return err + } + if readCrc, err := f.pageCrcReader.GetCrc(); err != nil || readCrc != uint32(crc) { + return msgs.BAD_BLOCK_CRC + } + f = nil + return nil +} + +func (b *BlockService) EraseBlock(blockId msgs.BlockId, cert [8]byte) ([8]byte, error) { + var proof [8]byte + if err := b.acquireWgIfActive(); err != nil { + return proof, err + } + defer b.wg.Done() + + expectedCert := certificate.BlockEraseCertificate(b.Id, blockId, b.cipher) + if expectedCert != cert { + b.logger.RaiseAlert("bad MAC, got %v, expected %v", cert, expectedCert) + return proof, msgs.BAD_CERTIFICATE + } + + now := time.Now() + pastCutoff := now.Add(-b.options.PastCutoff) + blockTime := msgs.TernTime(uint64(blockId)).Time() + + if blockTime.After(pastCutoff) { + return proof, msgs.BLOCK_TOO_RECENT_FOR_DELETION + } + + b.eraseCheckC <- struct{}{} + defer func() { <-b.eraseCheckC }() + atomic.AddUint64(&b.IoAttempts, 1) + + blockPath := path.Join(b.localPath, blockId.Path()) + b.logger.Debug("deleting block %v at path %v", blockId, blockPath) + err := eraseFileIfExistsAndSyncDir(blockPath) + if err != nil { + atomic.AddUint64(&b.IoErrors, 1) + return proof, err + } + atomic.AddUint64(&b.BlocksErased, 1) + proof = certificate.BlockEraseProof(b.Id, blockId, b.cipher) + return proof, nil +} + +func (b *BlockService) acquireWgIfActive() error { + b.wg.Add(1) + if b.active.Load() { + return nil + } + b.wg.Done() + return msgs.BLOCK_SERVICE_NOT_FOUND +} + +func (b *BlockService) startInfoUpdater() { + b.wg.Add(1) + go func(b *BlockService) { + defer b.wg.Done() + defer b.active.Store(false) + ticker := time.NewTicker(INFO_REFRESH_PERIOD) + for { + succeed := b.checkSecret() + succeed = succeed && b.updateCapacity() + succeed = succeed && b.countBlocks() + b.toDecommission.Store(!succeed) + + b.active.Store(b.secretFile != nil && b.cipher != nil) + select { + case <-b.stopC: + return + case <-ticker.C: + } + } + }(b) +} + +func (b *BlockService) checkSecret() bool { + var err error + secretFile := b.secretFile + + if b.secretFile == nil { + keyFilePath := path.Join(b.localPath, SECRET_FILE_NAME) + secretFile, err = os.Open(keyFilePath) + if err != nil { + b.logger.Warn("could not open secret file for block service %v, path: %v, err: %v", b.Id, keyFilePath, err) + return false + } + if err := syscall.Flock(int(secretFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB); err != nil { + b.logger.Warn("could not lock secret file for block service %v, path: %v, err: %v", b.Id, keyFilePath, err) + return false + } + } + defer func() { + if b.secretFile != nil { + return + } + if err := syscall.Flock(int(secretFile.Fd()), syscall.LOCK_UN); err != nil { + b.logger.Warn("failed unlocking secret file for block service %v, err: %v", b.Id, err) + } + if err := secretFile.Close(); err != nil { + b.logger.Warn("failed closing secret file for block service %v, err: %v", b.Id, err) + } + }() + + if _, err = secretFile.Seek(0, 0); err != nil { + b.logger.Warn("could not seek secret file for block service %v err: %v", b.Id, err) + return false + } + var key [16]byte + var read int + read, err = secretFile.Read(key[:]) + if err != nil { + b.logger.Warn("could not read secret file for block service %v err: %v", b.Id, err) + return false + } + if read != len(key) { + b.logger.Warn("truncated secret file for block service %v length: %v", b.Id, read) + return false + } + expectedKeyCrc := crc32c.Sum(0, key[:]) + var actualKeyCrc uint32 + if err := binary.Read(secretFile, binary.LittleEndian, &actualKeyCrc); err != nil { + b.logger.Warn("could not read secret file for block service %v err: %v", b.Id, err) + return false + } + if expectedKeyCrc != actualKeyCrc { + b.logger.Warn("crc mismatch in secret file for block service %v", b.Id) + return false + } + blockServiceId := BlockServiceIdFromKey(key) + if blockServiceId != b.Id { + b.logger.Warn("blockServiceId mismatch in secret file for block service %v blockService in file: %v", b.Id, blockServiceId) + return false + } + for i := range key { + if b.SecretKey[i] != key[i] { + b.logger.Warn("key mismatch in secret file for block service %v", b.Id) + return false + } + } + b.secretFile = secretFile + return true +} + +func (b *BlockService) updateCapacity() bool { + var statfs unix.Statfs_t + if err := unix.Statfs(path.Join(b.localPath, SECRET_FILE_NAME), &statfs); err != nil { + b.logger.Warn("could not update capacity for block service %v, err: %v", b.Id, err) + return false + } + capacityBytes := statfs.Blocks * uint64(statfs.Bsize) + availableBytes := statfs.Bavail * uint64(statfs.Bsize) + + capacityBytes -= min(capacityBytes, b.options.ReservedCapacityBytes) + availableBytes -= min(availableBytes, b.options.ReservedCapacityBytes) + + atomic.StoreUint64(&b.CapacityBytes, capacityBytes) + atomic.StoreUint64(&b.AvailableBytes, availableBytes) + return true +} + +func (b *BlockService) countBlocks() bool { + blocks, err := CountBlocks(b.localPath) + if err != nil { + b.logger.Warn("could not count blocks for block service %v, err: %v", b.Id, err) + return false + } + atomic.StoreUint64(&b.Blocks, blocks) + return true +} + +func BlockServiceIdFromKey(secretKey [16]byte) msgs.BlockServiceId { + return msgs.BlockServiceId(binary.LittleEndian.Uint64(secretKey[:8]) & uint64(0x7FFFFFFFFFFFFFFF)) +} + +func CreateSecret(log *log.Logger, secretPath string) *[16]byte { + var key [16]byte + + log.Info("creating new secret key at %s", secretPath) + if _, err := crand.Read(key[:]); err != nil { + log.Warn("failed creating secret at %s, err: %v", secretPath, err) + return nil + } + + var err error + var keyFile *os.File + if keyFile, err = os.OpenFile(secretPath, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600); err != nil { + log.Warn("failed creating new secret file at %s, err: %v", secretPath, err) + return nil + } + defer func() { + if err := keyFile.Close(); err != nil { + log.Warn("failed closing secret file at %s, err: %v", secretPath, err) + } + }() + + if _, err := keyFile.Seek(0, 0); err != nil { + log.Warn("failed seeking in secret file at %s, err: %v", secretPath, err) + return nil + } + if _, err := keyFile.Write(key[:]); err != nil { + log.Warn("failed creating secret at %s, err: %v", secretPath, err) + return nil + } + keyCrc := crc32c.Sum(0, key[:]) + if err := binary.Write(keyFile, binary.LittleEndian, keyCrc); err != nil { + log.Warn("failed creating secret at %s, err: %v", secretPath, err) + return nil + } + if err = keyFile.Sync(); err != nil { + log.Warn("failed syncing secret at %s, err: %v", secretPath, err) + return nil + } + dir, err := os.Open(filepath.Dir(secretPath)) + if err != nil { + log.Warn("failed opening secret dir at %s, err: %v", secretPath, err) + return nil + } + defer func() { + if err := dir.Close(); err != nil { + log.Warn("failed closing secret file dir at %s, err: %v", secretPath, err) + } + }() + if err = dir.Sync(); err != nil { + log.Warn("failed syncing secret dir at %s, err: %v", secretPath, err) + return nil + } + + return &key +} + +// migrateWithCrcDir moves all subdirectories from with_crc/ up one level. +// This is a one-time migration for production data that stored blocks under with_crc/. +func migrateWithCrcDir(logger *log.Logger, localPath string) { + withCrcPath := path.Join(localPath, "with_crc") + entries, err := os.ReadDir(withCrcPath) + if err != nil { + return + } + + logger.Info("migrating with_crc directory for %v", localPath) + for _, entry := range entries { + src := path.Join(withCrcPath, entry.Name()) + dst := path.Join(localPath, entry.Name()) + if err := os.Rename(src, dst); err != nil { + if os.IsExist(err) { + continue + } + logger.Warn("failed migrating %v to %v: %v", src, dst, err) + return + } + } + if err := os.Remove(withCrcPath); err != nil { + logger.Warn("failed removing empty with_crc directory %v: %v", withCrcPath, err) + } else { + logger.Info("with_crc migration complete for %v", localPath) + } +} diff --git a/go/ternblocks/blockservice/blockservice_test.go b/go/ternblocks/blockservice/blockservice_test.go new file mode 100644 index 00000000..db6c081c --- /dev/null +++ b/go/ternblocks/blockservice/blockservice_test.go @@ -0,0 +1,371 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: GPL-2.0-or-later + +package blockservice + +import ( + "bytes" + "crypto/rand" + "errors" + "io" + "os" + "path/filepath" + "testing" + "time" + + "xtx/ternfs/core/certificate" + "xtx/ternfs/core/crc32c" + "xtx/ternfs/core/log" + "xtx/ternfs/msgs" + + "xtx/ternfs/core/bufpool" +) + +func testLogger() *log.Logger { + return log.NewLogger(os.Stdout, &log.LoggerOptions{Level: log.DEBUG, Syslog: false, PrintQuietAlerts: true}) +} + +// createTestService creates a BlockService with a secret file on disk and waits for it to become active. +func createTestService(t *testing.T) *BlockService { + t.Helper() + tmpDir := t.TempDir() + logger := testLogger() + pool := bufpool.NewBufPool() + + secretPath := filepath.Join(tmpDir, SECRET_FILE_NAME) + key := CreateSecret(logger, secretPath) + if key == nil { + t.Fatal("Failed to create secret") + } + + id := BlockServiceIdFromKey(*key) + bsInfo := msgs.RegisterBlockServiceInfo{ + Id: id, + Path: "test:" + tmpDir, + SecretKey: *key, + } + + bs := OpenBlockService(logger, &BlockServiceOptions{ + BufferSize: 1024 * 1024, + FutureCutoff: time.Minute, + PastCutoff: time.Minute, + }, pool, bsInfo, "dev1") + + deadline := time.Now().Add(5 * time.Second) + for !bs.Active() { + if time.Now().After(deadline) { + bs.Close() + t.Fatal("Timed out waiting for block service to become active") + } + time.Sleep(10 * time.Millisecond) + } + return bs +} + +// writeTestBlockDirect writes block data directly to disk (bypassing WriteBlock). +func writeTestBlockDirect(t *testing.T, bs *BlockService, id msgs.BlockId, data []byte) { + t.Helper() + blockPath := filepath.Join(bs.localPath, id.Path()) + if err := os.MkdirAll(filepath.Dir(blockPath), 0755); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(blockPath, data, 0644); err != nil { + t.Fatal(err) + } +} + +func TestBlockServiceLifecycle(t *testing.T) { + bs := createTestService(t) + defer bs.Close() + + if bs.Id == msgs.BlockServiceId(0) { + t.Error("Invalid service ID generated") + } + if !bs.Active() { + t.Error("Service should be active") + } +} + +func TestWriteBlock(t *testing.T) { + bs := createTestService(t) + defer bs.Close() + + testData := make([]byte, 10*int(msgs.TERN_PAGE_SIZE)) + if _, err := rand.Read(testData); err != nil { + t.Fatal(err) + } + + t.Run("SuccessfulWrite", func(t *testing.T) { + blockID := msgs.BlockId(msgs.Now()) + crc := msgs.Crc(crc32c.Sum(0, testData)) + size := uint32(len(testData)) + writeCert := certificate.BlockWriteCertificate(bs.cipher, bs.Id, blockID, crc, size) + proof, err := bs.WriteBlock(blockID, writeCert, crc, size, bytes.NewReader(testData)) + if err != nil { + t.Fatalf("WriteBlock failed: %v", err) + } + + expectedProof := certificate.BlockWriteProof(bs.cipher, bs.Id, blockID) + if proof != expectedProof { + t.Errorf("Invalid write proof. Got %v, expected %v", proof, expectedProof) + } + + blockPath := filepath.Join(bs.localPath, blockID.Path()) + if _, err := os.Stat(blockPath); err != nil { + t.Errorf("Block file not created: %v", err) + } + }) + + t.Run("BadBlockCRC", func(t *testing.T) { + blockID := msgs.BlockId(msgs.Now()) + badCrc := msgs.Crc(0xDEADBEEF) + size := uint32(len(testData)) + writeCert := certificate.BlockWriteCertificate(bs.cipher, bs.Id, blockID, badCrc, size) + + _, err := bs.WriteBlock(blockID, writeCert, badCrc, size, bytes.NewReader(testData)) + if !errors.Is(err, msgs.BAD_BLOCK_CRC) { + t.Errorf("Expected BAD_BLOCK_CRC error, got: %v", err) + } + }) + + t.Run("BadWriteCertificate", func(t *testing.T) { + blockID := msgs.BlockId(msgs.Now()) + badCert := [8]byte{0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE, 0xBA, 0xBE} + _, err := bs.WriteBlock(blockID, badCert, msgs.Crc(0), 100, bytes.NewReader(make([]byte, 100))) + if !errors.Is(err, msgs.BAD_CERTIFICATE) { + t.Errorf("Expected BAD_CERTIFICATE error, got: %v", err) + } + }) + + t.Run("BlockTooBig", func(t *testing.T) { + blockID := msgs.BlockId(msgs.Now()) + crc := msgs.Crc(0) + size := MAX_OBJECT_SIZE + 1 + writeCert := certificate.BlockWriteCertificate(bs.cipher, bs.Id, blockID, crc, size) + _, err := bs.WriteBlock(blockID, writeCert, crc, size, bytes.NewReader(make([]byte, size))) + if !errors.Is(err, msgs.BLOCK_TOO_BIG) { + t.Errorf("Expected BLOCK_TOO_BIG error, got: %v", err) + } + }) + + t.Run("BlockTooOldForWrite", func(t *testing.T) { + blockID := msgs.BlockId(msgs.MakeTernTime(time.Now().Add(-2 * time.Hour))) + crc := msgs.Crc(0) + size := uint32(100) + writeCert := certificate.BlockWriteCertificate(bs.cipher, bs.Id, blockID, crc, size) + _, err := bs.WriteBlock(blockID, writeCert, crc, size, bytes.NewReader(make([]byte, size))) + if !errors.Is(err, msgs.BLOCK_TOO_OLD_FOR_WRITE) { + t.Errorf("Expected BLOCK_TOO_OLD_FOR_WRITE error, got: %v", err) + } + }) + + t.Run("TempFileCleanupOnError", func(t *testing.T) { + blockID := msgs.BlockId(msgs.Now()) + crc := msgs.Crc(crc32c.Sum(0, testData)) + size := uint32(len(testData)) + writeCert := certificate.BlockWriteCertificate(bs.cipher, bs.Id, blockID, crc, size) + + // Reader that returns no data — will cause CRC mismatch + _, _ = bs.WriteBlock(blockID, writeCert, crc, size, io.LimitReader(bytes.NewReader(nil), 0)) + + tmpFiles, _ := filepath.Glob(filepath.Join(bs.localPath, "tmp.*")) + if len(tmpFiles) > 0 { + t.Errorf("Temporary files not cleaned up: %v", tmpFiles) + } + }) +} + +func TestBlockOperations(t *testing.T) { + bs := createTestService(t) + defer bs.Close() + + testData := make([]byte, 10*int(msgs.TERN_PAGE_SIZE)) + if _, err := rand.Read(testData); err != nil { + t.Fatal(err) + } + + // Pre-compute block data with CRCs for direct writes + var crcBuf bytes.Buffer + writer := NewPageCrcWriter(&crcBuf) + writer.Write(testData) + blockData := crcBuf.Bytes() + + t.Run("WriteAndReadBlock", func(t *testing.T) { + blockID := msgs.BlockId(12345) + writeTestBlockDirect(t, bs, blockID, blockData) + + reader, err := bs.GetBlockReader(blockID, 0, uint32(len(testData)), true, true) + if err != nil { + t.Fatal(err) + } + defer reader.Close() + + buf := make([]byte, len(testData)) + _, err = io.ReadFull(reader, buf) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(testData, buf) { + t.Error("Read data doesn't match written data") + } + }) + + t.Run("CheckBlock", func(t *testing.T) { + blockID := msgs.BlockId(12345) + writeTestBlockDirect(t, bs, blockID, blockData) + + err := bs.CheckBlock(blockID, uint32(len(testData)), msgs.Crc(crc32c.Sum(0, testData))) + if err != nil { + t.Errorf("Block check failed: %v", err) + } + }) + + t.Run("CheckCorruptedBlock", func(t *testing.T) { + blockID := msgs.BlockId(12345) + writeTestBlockDirect(t, bs, blockID, blockData) + + corruptBlockPath := filepath.Join(bs.localPath, blockID.Path()) + data, err := os.ReadFile(corruptBlockPath) + if err != nil { + t.Fatal(err) + } + data[0] ^= 1 + if err := os.WriteFile(corruptBlockPath, data, 0644); err != nil { + t.Fatal(err) + } + + err = bs.CheckBlock(blockID, uint32(len(testData)), msgs.Crc(crc32c.Sum(0, testData))) + if err == nil { + t.Error("CheckBlock did not detect corrupted block") + } + if !errors.Is(err, msgs.BAD_BLOCK_CRC) { + t.Errorf("CheckBlock returned unexpected error: %v", err) + } + }) + + t.Run("EraseBlock", func(t *testing.T) { + blockID := msgs.BlockId(12345) + writeTestBlockDirect(t, bs, blockID, blockData) + + cert := certificate.BlockEraseCertificate(bs.Id, blockID, bs.cipher) + proof, err := bs.EraseBlock(blockID, cert) + if err != nil { + t.Fatal(err) + } + + if proof != certificate.BlockEraseProof(bs.Id, blockID, bs.cipher) { + t.Errorf("EraseBlock returned incorrect proof: %v", proof) + } + + _, err = os.Stat(filepath.Join(bs.localPath, blockID.Path())) + if !os.IsNotExist(err) { + t.Error("Block file not deleted") + } + }) + + t.Run("EraseBlockInvalidCertificate", func(t *testing.T) { + blockID := msgs.BlockId(12345) + writeTestBlockDirect(t, bs, blockID, blockData) + + badBlockID := msgs.BlockId(99999) + badCert := certificate.BlockEraseCertificate(bs.Id, badBlockID, bs.cipher) + + _, err := bs.EraseBlock(blockID, badCert) + if err != msgs.BAD_CERTIFICATE { + t.Errorf("Expected BAD_CERTIFICATE error, got: %v", err) + } + + if _, err := os.Stat(filepath.Join(bs.localPath, blockID.Path())); err != nil { + t.Error("Block should not have been deleted") + } + }) + + t.Run("GetBlockFileForFetch", func(t *testing.T) { + blockID := msgs.BlockId(12345) + writeTestBlockDirect(t, bs, blockID, blockData) + + f, byteCount, err := bs.GetBlockFileForFetch(blockID, 0, uint32(len(testData)), false) + if err != nil { + t.Fatal(err) + } + defer func() { + f.Close() + bs.ReleaseBlockFile() + }() + + expectedByteCount := int64(len(blockData)) + if byteCount != expectedByteCount { + t.Errorf("Expected byte count %d, got %d", expectedByteCount, byteCount) + } + + buf := make([]byte, byteCount) + _, err = io.ReadFull(f, buf) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(buf, blockData) { + t.Error("File data doesn't match written data") + } + }) +} + +func TestCapacityTracking(t *testing.T) { + bs := createTestService(t) + defer bs.Close() + + t.Run("InitialCapacity", func(t *testing.T) { + if !bs.updateCapacity() { + t.Error("Failed to update capacity") + } + if bs.CapacityBytes == 0 { + t.Error("Invalid capacity reporting") + } + }) + + t.Run("BlockCount", func(t *testing.T) { + testData := make([]byte, int(msgs.TERN_PAGE_SIZE)) + if _, err := rand.Read(testData); err != nil { + t.Fatal(err) + } + var buf bytes.Buffer + writer := NewPageCrcWriter(&buf) + writer.Write(testData) + blockData := buf.Bytes() + + const BLOCK_COUNT = 10 + for i := range BLOCK_COUNT { + blockID := msgs.BlockId(12345 + i) + writeTestBlockDirect(t, bs, blockID, blockData) + } + + if !bs.countBlocks() { + t.Error("Failed to count blocks") + } + if bs.Blocks != BLOCK_COUNT { + t.Errorf("Incorrect block count: expected %d, got %d", BLOCK_COUNT, bs.Blocks) + } + }) +} + +func TestMigrateWithCrcDir(t *testing.T) { + tmpDir := t.TempDir() + logger := testLogger() + + // Create with_crc directory with subdirectories + withCrcPath := filepath.Join(tmpDir, "with_crc") + os.MkdirAll(filepath.Join(withCrcPath, "00"), 0755) + os.MkdirAll(filepath.Join(withCrcPath, "01"), 0755) + os.WriteFile(filepath.Join(withCrcPath, "00", "testblock"), []byte("data"), 0644) + + migrateWithCrcDir(logger, tmpDir) + + // Verify subdirectories were moved up + if _, err := os.Stat(filepath.Join(tmpDir, "00", "testblock")); err != nil { + t.Errorf("Block not migrated: %v", err) + } + if _, err := os.Stat(withCrcPath); !os.IsNotExist(err) { + t.Error("with_crc directory should have been removed") + } +} diff --git a/go/ternblocks/blockservice/countblocks.go b/go/ternblocks/blockservice/countblocks.go new file mode 100644 index 00000000..e7ba6cd8 --- /dev/null +++ b/go/ternblocks/blockservice/countblocks.go @@ -0,0 +1,90 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: GPL-2.0-or-later + +package blockservice + +// #include +// #include +// #include +// #include +// #include +// #include +// #include +// +// struct linux_dirent { +// unsigned long d_ino; +// off_t d_off; +// unsigned short d_reclen; +// char d_name[]; +// }; +// +// // If negative, it's an error code. +// ssize_t count_blocks(char* base_path) { +// int base_fd = -1; +// int dir_fd = -1; +// ssize_t blocks = 0; +// base_fd = open(base_path, O_RDONLY|O_DIRECTORY); +// if (base_fd < 0) { +// fprintf(stderr, "could not open %s: %d\n", base_path, errno); +// blocks = -errno; +// goto out; +// } +// char buf[1024]; +// for (int i = 0; i < 256; i++) { +// char dir_path[3]; +// snprintf(dir_path, 3, "%02x", i); +// if (dir_fd >= 0) { close(dir_fd); } +// dir_fd = openat(base_fd, dir_path, O_RDONLY|O_DIRECTORY); +// if (dir_fd < 0) { +// if (errno == ENOENT) { +// continue; +// } +// fprintf(stderr, "could not open dir %s/%s: %d\n", base_path, dir_path, errno); +// blocks = -errno; +// goto out; +// } +// for (;;) { +// long read = syscall(SYS_getdents, dir_fd, buf, sizeof(buf)); +// if (read < 0) { +// fprintf(stderr, "could not read direntries in %s/%s: %d\n", base_path, dir_path, errno); +// blocks = -errno; +// goto out; +// } +// if (read == 0) { break; } +// long bpos = 0; +// while( bpos < read) { +// struct linux_dirent *entry = (struct linux_dirent *)(buf + bpos); +// bpos += entry->d_reclen; +// if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0 || +// strncmp(entry->d_name, "tmp.", 4) == 0) { +// continue; +// } +// blocks++; +// } +// } +// } +// out: +// if (dir_fd >= 0) { close(dir_fd); } +// if (base_fd >= 0) { close(base_fd); } +// return blocks; +// } +import "C" + +import ( + "syscall" + "unsafe" +) + +// CountBlocks counts the number of block files in a block service directory. +// Done in C to minimize syscall overhead and avoid starving goroutines +// waiting on network I/O. +func CountBlocks(basePath string) (uint64, error) { + cBasePath := C.CString(basePath) + defer C.free(unsafe.Pointer(cBasePath)) + blocks := C.count_blocks(cBasePath) + if blocks < 0 { + return 0, syscall.Errno(-blocks) + } + return uint64(blocks), nil +} diff --git a/go/ternblocks/blockservice/crcstream.go b/go/ternblocks/blockservice/crcstream.go new file mode 100644 index 00000000..c0d28bce --- /dev/null +++ b/go/ternblocks/blockservice/crcstream.go @@ -0,0 +1,165 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: GPL-2.0-or-later + +package blockservice + +import ( + "encoding/binary" + "fmt" + "io" + "xtx/ternfs/core/crc32c" + "xtx/ternfs/msgs" +) + +type pageCrcReader struct { + r io.Reader + streamCrc [4]byte + fullCrc uint32 + pageCrc uint32 + stripCrc bool + totalRead int +} + +// NewPageCrcReader creates a reader that validates page CRCs and optionally strips them. +// Pages are expected to be TERN_PAGE_SIZE bytes followed by 4-byte CRC. +// When stripCrc is true, CRC bytes are removed from the output stream. +func NewPageCrcReader(r io.Reader, stripCrc bool) *pageCrcReader { + return &pageCrcReader{ + r: r, + stripCrc: stripCrc, + } +} + +// GetCrc returns the CRC of everything read so far. +// Valid only when called after reading a complete page (including CRC bytes), error otherwise. +func (c *pageCrcReader) GetCrc() (uint32, error) { + if c.totalRead%int(msgs.TERN_PAGE_WITH_CRC_SIZE) != 0 { + return c.fullCrc, fmt.Errorf("not at the end of a page") + } + return c.fullCrc, nil +} + +// Read implements io.Reader while validating page CRCs. Returns BAD_BLOCK_CRC on mismatch. +// When stripping CRCs, handles cases where caller's buffer doesn't have space for CRC bytes +// by reading and validating them internally without copying to output. +func (c *pageCrcReader) Read(p []byte) (int, error) { + var read int + var readFromSource int + var err error + for read < len(p) { + readFromSource, err = c.r.Read(p[read:]) + for readFromSource > 0 { + offsetInPage := c.totalRead % int(msgs.TERN_PAGE_WITH_CRC_SIZE) + if offsetInPage < int(msgs.TERN_PAGE_SIZE) { + availableInPage := int(msgs.TERN_PAGE_SIZE) - offsetInPage + dataLen := min(availableInPage, readFromSource) + c.pageCrc = crc32c.Sum(c.pageCrc, p[read:read+dataLen]) + c.fullCrc = crc32c.Sum(c.fullCrc, p[read:read+dataLen]) + read += dataLen + readFromSource -= dataLen + c.totalRead += dataLen + continue + } + offsetInCrc := offsetInPage - int(msgs.TERN_PAGE_SIZE) + availableCrcBytes := min(4, readFromSource) + + copy(c.streamCrc[offsetInCrc:], p[read:read+availableCrcBytes]) + read += availableCrcBytes + readFromSource -= availableCrcBytes + c.totalRead += availableCrcBytes + if c.stripCrc { + read -= availableCrcBytes + copy(p[read:], p[read+availableCrcBytes:read+availableCrcBytes+readFromSource]) + } + if offsetInCrc+availableCrcBytes == 4 { + streamCrc := binary.LittleEndian.Uint32(c.streamCrc[:]) + if streamCrc != c.pageCrc { + return read, msgs.BAD_BLOCK_CRC + } + c.pageCrc = 0 + } + } + if err != nil { + break + } + } + // Handle case where the reader wants to strip CRCs and will provide a buffer where + // the last CRC won't fit. We still need to read and validate it. + if c.stripCrc && c.totalRead%int(msgs.TERN_PAGE_WITH_CRC_SIZE) == int(msgs.TERN_PAGE_SIZE) { + if _, err := io.ReadFull(c.r, c.streamCrc[:]); err != nil { + return read, msgs.BAD_BLOCK_CRC + } + c.totalRead += 4 + streamCrc := binary.LittleEndian.Uint32(c.streamCrc[:]) + if streamCrc != c.pageCrc { + return read, msgs.BAD_BLOCK_CRC + } + c.pageCrc = 0 + } + return read, err +} + +func (c *pageCrcReader) Close() error { + if closer, ok := c.r.(io.Closer); ok { + if err := closer.Close(); err != nil { + return err + } + } + if c.totalRead%int(msgs.TERN_PAGE_WITH_CRC_SIZE) != 0 { + return fmt.Errorf("incomplete page read") + } + return nil +} + +type pageCrcWriter struct { + w io.Writer + fullCrc uint32 + pageCrc uint32 + pageOffset int +} + +func NewPageCrcWriter(w io.Writer) *pageCrcWriter { + return &pageCrcWriter{w: w} +} + +// Write implements io.Writer while appending CRCs after each full page. +func (w *pageCrcWriter) Write(p []byte) (int, error) { + var written int + for len(p) > 0 { + toWrite := min(len(p), int(msgs.TERN_PAGE_SIZE)-w.pageOffset) + n, err := w.w.Write(p[:toWrite]) + if err != nil { + return written, err + } + w.pageCrc = crc32c.Sum(w.pageCrc, p[:n]) + w.fullCrc = crc32c.Sum(w.fullCrc, p[:n]) + w.pageOffset += n + written += n + p = p[n:] + if w.pageOffset != int(msgs.TERN_PAGE_SIZE) { + continue + } + var calculatedCrc [4]byte + binary.LittleEndian.PutUint32(calculatedCrc[:], w.pageCrc) + crcSlice := calculatedCrc[:] + for len(crcSlice) > 0 { + m, err := w.w.Write(crcSlice) + if err != nil { + return written, err + } + crcSlice = crcSlice[m:] + } + w.pageOffset = 0 + w.pageCrc = 0 + } + return written, nil +} + +// GetCrc returns the full data CRC when no partial page exists. +func (w *pageCrcWriter) GetCrc() (uint32, error) { + if w.pageOffset != 0 { + return w.fullCrc, fmt.Errorf("incomplete page, cannot get full CRC") + } + return w.fullCrc, nil +} diff --git a/go/ternblocks/blockservice/crcstream_test.go b/go/ternblocks/blockservice/crcstream_test.go new file mode 100644 index 00000000..d070385f --- /dev/null +++ b/go/ternblocks/blockservice/crcstream_test.go @@ -0,0 +1,220 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: GPL-2.0-or-later + +package blockservice + +import ( + "bytes" + "encoding/binary" + "testing" + "xtx/ternfs/core/crc32c" + "xtx/ternfs/msgs" +) + +func TestPageCrcReader(t *testing.T) { + t.Run("valid full page with crc", func(t *testing.T) { + data := bytes.Repeat([]byte{0xAA}, int(msgs.TERN_PAGE_SIZE)) + var crcBuf [4]byte + crc := crc32c.Sum(0, data) + binary.LittleEndian.PutUint32(crcBuf[:], crc) + input := append(data, crcBuf[:]...) + + reader := NewPageCrcReader(bytes.NewReader(input), false) + buf := make([]byte, len(input)) + + n, err := reader.Read(buf) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if n != len(input) { + t.Fatalf("Expected to read %d bytes, got %d", len(input), n) + } + if !bytes.Equal(buf[:len(data)], data) { + t.Error("Data corrupted during read") + } + }) + + t.Run("invalid crc should error", func(t *testing.T) { + data := bytes.Repeat([]byte{0xBB}, int(msgs.TERN_PAGE_SIZE)) + var crcBuf [4]byte + binary.LittleEndian.PutUint32(crcBuf[:], 0xDEADBEEF) + input := append(data, crcBuf[:]...) + + reader := NewPageCrcReader(bytes.NewReader(input), false) + buf := make([]byte, len(input)) + + _, err := reader.Read(buf) + if err != msgs.BAD_BLOCK_CRC { + t.Fatal("Expected BAD_BLOCK_CRC error") + } + }) + + t.Run("strip crc from output", func(t *testing.T) { + data := bytes.Repeat([]byte{0xCC}, int(msgs.TERN_PAGE_SIZE)) + var crcBuf [4]byte + crc := crc32c.Sum(0, data) + binary.LittleEndian.PutUint32(crcBuf[:], crc) + input := append(data, crcBuf[:]...) + + reader := NewPageCrcReader(bytes.NewReader(input), true) + buf := make([]byte, len(data)) + + n, err := reader.Read(buf) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if n != len(data) { + t.Fatalf("Expected to read %d bytes, got %d", len(data), n) + } + if !bytes.Equal(buf, data) { + t.Error("Data corrupted during read") + } + }) + + t.Run("get full crc at page boundary", func(t *testing.T) { + data := bytes.Repeat([]byte{0x11}, int(msgs.TERN_PAGE_SIZE)) + var crcBuf [4]byte + crc := crc32c.Sum(0, data) + binary.LittleEndian.PutUint32(crcBuf[:], crc) + input := append(data, crcBuf[:]...) + + reader := NewPageCrcReader(bytes.NewReader(input), false) + buf := make([]byte, len(input)) + _, err := reader.Read(buf) + if err != nil { + t.Fatal(err) + } + + gotCrc, err := reader.GetCrc() + if err != nil { + t.Fatalf("GetCrc error: %v", err) + } + if gotCrc != crc { + t.Errorf("Expected full CRC %x, got %x", crc, gotCrc) + } + }) + + t.Run("get crc mid-page returns error", func(t *testing.T) { + data := bytes.Repeat([]byte{0x22}, 512) + reader := NewPageCrcReader(bytes.NewReader(data), false) + buf := make([]byte, 512) + _, err := reader.Read(buf) + if err != nil { + t.Fatal(err) + } + + expectedCrc := crc32c.Sum(0, data) + gotCrc, err := reader.GetCrc() + if err == nil { + t.Error("Expected error when getting CRC mid-page") + } + if gotCrc != expectedCrc { + t.Errorf("Expected partial CRC %x, got %x", expectedCrc, gotCrc) + } + }) + + t.Run("multiple pages", func(t *testing.T) { + var input []byte + var allData []byte + for i := range 3 { + data := bytes.Repeat([]byte{byte(i)}, int(msgs.TERN_PAGE_SIZE)) + allData = append(allData, data...) + var crcBuf [4]byte + crc := crc32c.Sum(0, data) + binary.LittleEndian.PutUint32(crcBuf[:], crc) + input = append(input, data...) + input = append(input, crcBuf[:]...) + } + + reader := NewPageCrcReader(bytes.NewReader(input), true) + buf := make([]byte, len(allData)) + n, err := reader.Read(buf) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if n != len(allData) { + t.Fatalf("Expected to read %d bytes, got %d", len(allData), n) + } + if !bytes.Equal(buf, allData) { + t.Error("Data corrupted during multi-page read with CRC stripping") + } + }) +} + +func TestPageCrcWriter(t *testing.T) { + t.Run("write full page with crc", func(t *testing.T) { + var buf bytes.Buffer + writer := NewPageCrcWriter(&buf) + data := bytes.Repeat([]byte{0xDD}, int(msgs.TERN_PAGE_SIZE)) + + n, err := writer.Write(data) + if err != nil { + t.Fatal(err) + } + if n != len(data) { + t.Fatalf("Expected to write %d bytes, got %d", len(data), n) + } + + if buf.Len() != len(data)+4 { + t.Fatalf("Expected %d bytes written, got %d", len(data)+4, buf.Len()) + } + + writtenCrc := binary.LittleEndian.Uint32(buf.Bytes()[len(data):]) + expectedCrc := crc32c.Sum(0, data) + if writtenCrc != expectedCrc { + t.Errorf("CRC mismatch: expected %x, got %x", expectedCrc, writtenCrc) + } + }) + + t.Run("write partial page", func(t *testing.T) { + var buf bytes.Buffer + writer := NewPageCrcWriter(&buf) + data := bytes.Repeat([]byte{0xEE}, 512) + + _, err := writer.Write(data) + if err != nil { + t.Fatal(err) + } + + if buf.Len() != len(data) { + t.Fatalf("Expected %d bytes written, got %d", len(data), buf.Len()) + } + }) + + t.Run("get crc before completion", func(t *testing.T) { + var buf bytes.Buffer + writer := NewPageCrcWriter(&buf) + data := bytes.Repeat([]byte{0xFF}, 512) + + writer.Write(data) + _, err := writer.GetCrc() + if err == nil { + t.Fatal("Expected error when getting CRC from incomplete page") + } + }) + + t.Run("roundtrip write then read", func(t *testing.T) { + data := bytes.Repeat([]byte{0x42}, int(msgs.TERN_PAGE_SIZE)*5) + + var buf bytes.Buffer + writer := NewPageCrcWriter(&buf) + _, err := writer.Write(data) + if err != nil { + t.Fatal(err) + } + + reader := NewPageCrcReader(bytes.NewReader(buf.Bytes()), true) + result := make([]byte, len(data)) + n, err := reader.Read(result) + if err != nil { + t.Fatalf("Read error: %v", err) + } + if n != len(data) { + t.Fatalf("Expected %d bytes, got %d", len(data), n) + } + if !bytes.Equal(data, result) { + t.Error("Roundtrip data mismatch") + } + }) +} diff --git a/go/ternblocks/blockservice/utils.go b/go/ternblocks/blockservice/utils.go new file mode 100644 index 00000000..4ec67f8c --- /dev/null +++ b/go/ternblocks/blockservice/utils.go @@ -0,0 +1,36 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: GPL-2.0-or-later + +package blockservice + +import ( + "os" + "path/filepath" +) + +func moveFileAndSyncDir(src, dst string) error { + err := os.Rename(src, dst) + if err != nil { + return err + } + dir, err := os.Open(filepath.Dir(dst)) + if err != nil { + return err + } + defer dir.Close() + return dir.Sync() +} + +func eraseFileIfExistsAndSyncDir(path string) error { + err := os.Remove(path) + if err != nil { + return err + } + dir, err := os.Open(filepath.Dir(path)) + if err != nil { + return err + } + defer dir.Close() + return dir.Sync() +} From 64db04c3bc1a1d6d54b35ee39542c70af070abba Mon Sep 17 00:00:00 2001 From: Miroslav Crnic Date: Tue, 24 Mar 2026 13:18:05 +0000 Subject: [PATCH 5/5] ternblocks: refactor to use blockservice package --- go/core/managedprocess/managedprocess.go | 6 +- go/ternblocks/blockservice/blockservice.go | 7 +- .../blockservice/blockservice_test.go | 1 + go/ternblocks/ternblocks.go | 1479 ++++------------- go/terntests/terntests.go | 4 +- 5 files changed, 334 insertions(+), 1163 deletions(-) diff --git a/go/core/managedprocess/managedprocess.go b/go/core/managedprocess/managedprocess.go index 715faa33..0db5b322 100644 --- a/go/core/managedprocess/managedprocess.go +++ b/go/core/managedprocess/managedprocess.go @@ -273,7 +273,7 @@ type BlockServiceOpts struct { StorageClasses []msgs.StorageClass FailureDomain string Location msgs.Location - FutureCutoff *time.Duration + EraseCutoff *time.Duration LogLevel log.LogLevel RegistryAddress string Profile bool @@ -300,8 +300,8 @@ func (procs *ManagedProcesses) StartBlockService(ll *log.Logger, opts *BlockServ if opts.Addr2 != "" { args = append(args, "-addr", opts.Addr2) } - if opts.FutureCutoff != nil { - args = append(args, "-future-cutoff", opts.FutureCutoff.String()) + if opts.EraseCutoff != nil { + args = append(args, "-erase-cutoff", opts.EraseCutoff.String()) } if opts.LogLevel == log.DEBUG { args = append(args, "-verbose") diff --git a/go/ternblocks/blockservice/blockservice.go b/go/ternblocks/blockservice/blockservice.go index 1637ab9d..3423a4b7 100644 --- a/go/ternblocks/blockservice/blockservice.go +++ b/go/ternblocks/blockservice/blockservice.go @@ -48,6 +48,7 @@ type BlockServiceOptions struct { MaxConcurrentReads int FutureCutoff time.Duration PastCutoff time.Duration + EraseCutoff time.Duration ReservedCapacityBytes uint64 } @@ -454,10 +455,10 @@ func (b *BlockService) EraseBlock(blockId msgs.BlockId, cert [8]byte) ([8]byte, } now := time.Now() - pastCutoff := now.Add(-b.options.PastCutoff) + eraseCutoff := now.Add(-b.options.EraseCutoff) blockTime := msgs.TernTime(uint64(blockId)).Time() - if blockTime.After(pastCutoff) { + if blockTime.After(eraseCutoff) { return proof, msgs.BLOCK_TOO_RECENT_FOR_DELETION } @@ -468,7 +469,7 @@ func (b *BlockService) EraseBlock(blockId msgs.BlockId, cert [8]byte) ([8]byte, blockPath := path.Join(b.localPath, blockId.Path()) b.logger.Debug("deleting block %v at path %v", blockId, blockPath) err := eraseFileIfExistsAndSyncDir(blockPath) - if err != nil { + if err != nil && !os.IsNotExist(err) { atomic.AddUint64(&b.IoErrors, 1) return proof, err } diff --git a/go/ternblocks/blockservice/blockservice_test.go b/go/ternblocks/blockservice/blockservice_test.go index db6c081c..fbe9928b 100644 --- a/go/ternblocks/blockservice/blockservice_test.go +++ b/go/ternblocks/blockservice/blockservice_test.go @@ -50,6 +50,7 @@ func createTestService(t *testing.T) *BlockService { BufferSize: 1024 * 1024, FutureCutoff: time.Minute, PastCutoff: time.Minute, + EraseCutoff: time.Minute, }, pool, bsInfo, "dev1") deadline := time.Now().Add(5 * time.Second) diff --git a/go/ternblocks/ternblocks.go b/go/ternblocks/ternblocks.go index effcb3ef..013ced59 100644 --- a/go/ternblocks/ternblocks.go +++ b/go/ternblocks/ternblocks.go @@ -8,34 +8,25 @@ import ( "bufio" "bytes" "context" - "crypto/aes" - "crypto/cipher" - crand "crypto/rand" "encoding/binary" "errors" "flag" "fmt" "io" - "math/rand" mrand "math/rand" "net" "os" "os/signal" "path" - "path/filepath" "runtime/pprof" "strconv" "strings" - "sync" "sync/atomic" "syscall" "time" - "unsafe" "xtx/ternfs/client" "xtx/ternfs/core/bufpool" - "xtx/ternfs/core/cbcmac" - "xtx/ternfs/core/certificate" "xtx/ternfs/core/crc32c" "xtx/ternfs/core/flags" "xtx/ternfs/core/log" @@ -43,340 +34,37 @@ import ( "xtx/ternfs/core/timing" "xtx/ternfs/core/wyhash" "xtx/ternfs/msgs" + "xtx/ternfs/ternblocks/blockservice" "golang.org/x/net/ipv4" - "golang.org/x/sys/unix" ) -// #include -// #include -// #include -// #include -// #include -// #include -// #include -// -// struct linux_dirent { -// unsigned long d_ino; -// off_t d_off; -// unsigned short d_reclen; -// char d_name[]; -// }; -// -// // If negative, it's an error code. -// ssize_t count_blocks(char* base_path) { -// int base_fd = -1; -// int dir_fd = -1; -// ssize_t blocks = 0; -// base_fd = open(base_path, O_RDONLY|O_DIRECTORY); -// if (base_fd < 0) { -// fprintf(stderr, "could not open %s: %d\n", base_path, errno); -// blocks = -errno; -// goto out; -// } -// char buf[1024]; -// for (int i = 0; i < 256; i++) { -// char dir_path[3]; -// snprintf(dir_path, 3, "%02x", i); -// if (dir_fd >= 0) { close(dir_fd); } -// dir_fd = openat(base_fd, dir_path, O_RDONLY|O_DIRECTORY); -// if (dir_fd < 0) { -// if (errno == ENOENT) { -// continue; -// } -// fprintf(stderr, "could not open dir %s/%s: %d\n", base_path, dir_path, errno); -// blocks = -errno; -// goto out; -// } -// for (;;) { -// long read = syscall(SYS_getdents, dir_fd, buf, sizeof(buf)); -// if (read < 0) { -// fprintf(stderr, "could not read direntries in %s/%s: %d\n", base_path, dir_path, errno); -// blocks = -errno; -// goto out; -// } -// if (read == 0) { break; } -// long bpos = 0; -// while( bpos < read) { -// struct linux_dirent *entry = (struct linux_dirent *)(buf + bpos); -// bpos += entry->d_reclen; -// if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0 || -// strncmp(entry->d_name, "tmp.", 4) == 0) { -// // we only want to count blocks and ignore . and .. and tmp.* files -// continue; -// } -// blocks++; -// } -// } -// } -// out: -// if (dir_fd >= 0) { close(dir_fd); } -// if (base_fd >= 0) { close(base_fd); } -// return blocks; -// } -import "C" - -type blockServiceStats struct { - blocksWritten uint64 - bytesWritten uint64 - blocksErased uint64 - blocksFetched uint64 - bytesFetched uint64 - blocksChecked uint64 - bytesChecked uint64 - blocksConverted uint64 - blockConversionDiscarded uint64 - blockTooOldForWrite uint64 - badBlockCrc uint64 - badCertificate uint64 +type blockServiceEntry struct { + *blockservice.BlockService + // ternblocks-specific error counters for metrics + badBlockCrc uint64 + blockTooOldForWrite uint64 + badCertificate uint64 + // request counting for IO error rate alerts + requests uint64 + lastIoErrors uint64 + lastRequests uint64 + // alert state + ioErrorsAlert log.XmonNCAlert + decommissioned bool } + +type deadBlockService struct{} + type env struct { bufPool *bufpool.BufPool - stats map[msgs.BlockServiceId]*blockServiceStats counters map[msgs.BlocksMessageKind]*timing.Timings - eraseLocks map[msgs.BlockServiceId]*sync.Mutex registryConn *client.RegistryConn failureDomain string - hostname string pathPrefix string ioAlertPercent uint8 } -func BlockWriteProof(blockServiceId msgs.BlockServiceId, blockId msgs.BlockId, key cipher.Block) [8]byte { - buf := bytes.NewBuffer([]byte{}) - // struct.pack_into(' -func countBlocks(basePath string) (uint64, error) { - cBasePath := C.CString(basePath) - defer C.free(unsafe.Pointer(cBasePath)) - blocks := C.count_blocks(cBasePath) - if blocks < 0 { - return 0, syscall.Errno(-blocks) - } - return uint64(blocks), nil -} - -func updateBlockServiceInfoCapacity( - _ *log.Logger, - blockService *blockService, - reservedStorage uint64, -) error { - var statfs unix.Statfs_t - if err := unix.Statfs(path.Join(blockService.path, "secret.key"), &statfs); err != nil { - return err - } - - capacityBytes := statfs.Blocks * uint64(statfs.Bsize) - if capacityBytes < reservedStorage { - capacityBytes = 0 - } else { - capacityBytes -= reservedStorage - } - availableBytes := statfs.Bavail * uint64(statfs.Bsize) - if availableBytes < reservedStorage { - availableBytes = 0 - } else { - availableBytes -= reservedStorage - } - blockService.cachedInfo.CapacityBytes = capacityBytes - blockService.cachedInfo.AvailableBytes = availableBytes - return nil -} - -// either updates `blockService`, or returns an error. -func updateBlockServiceInfoBlocks( - log *log.Logger, - blockService *blockService, -) error { - t := time.Now() - log.Info("starting to count blocks for %v", blockService.cachedInfo.Id) - blocksWithCrc, err := countBlocks(blockService.path) - if err != nil { - return err - } - - blockService.cachedInfo.Blocks = blocksWithCrc - log.Info("done counting blocks for %v in %v. (blocks: %d)", blockService.cachedInfo.Id, time.Since(t), blocksWithCrc) - return nil -} - -func initBlockServicesInfo( - env *env, - log *log.Logger, - locationId msgs.Location, - addrs msgs.AddrsInfo, - failureDomain [16]byte, - blockServices map[msgs.BlockServiceId]*blockService, - reservedStorage uint64, -) error { - log.Info("initializing block services info") - var wg sync.WaitGroup - wg.Add(len(blockServices)) - alert := log.NewNCAlert(0) - log.RaiseNC(alert, "getting info for %v block services", len(blockServices)) - for id, bs := range blockServices { - bs.cachedInfo.LocationId = locationId - bs.cachedInfo.Id = id - bs.cachedInfo.Addrs = addrs - bs.cachedInfo.SecretKey = bs.key - bs.cachedInfo.StorageClass = bs.storageClass - bs.cachedInfo.FailureDomain.Name = failureDomain - if len(env.pathPrefix) > 0 { - bs.cachedInfo.Path = fmt.Sprintf("%s:%s", env.pathPrefix, bs.path) - } else { - bs.cachedInfo.Path = bs.path - } - closureBs := bs - go func() { - // only update if it isn't filled it in already from registry - if closureBs.cachedInfo.Blocks == 0 { - if err := updateBlockServiceInfoCapacity(log, closureBs, reservedStorage); err != nil { - panic(err) - } - if err := updateBlockServiceInfoBlocks(log, closureBs); err != nil { - panic(err) - } - } - wg.Done() - }() - } - wg.Wait() - log.ClearNC(alert) - return nil -} - -var minimumRegisterInterval time.Duration = time.Second * 60 -var maximumRegisterInterval time.Duration = minimumRegisterInterval * 2 -var variantRegisterInterval time.Duration = maximumRegisterInterval - minimumRegisterInterval - -func registerPeriodically( - log *log.Logger, - blockServices map[msgs.BlockServiceId]*blockService, - env *env, -) { - req := msgs.RegisterBlockServicesReq{} - alert := log.NewNCAlert(10 * time.Second) - failureBackoff := 100 * time.Millisecond - const maxFailureBackoff = 60 * time.Second - registrationCount := 0 - for { - req.BlockServices = req.BlockServices[:0] - for _, bs := range blockServices { - if bs.couldNotUpdateInfoBlocks || bs.couldNotUpdateInfoCapacity { - continue - } - req.BlockServices = append(req.BlockServices, bs.cachedInfo) - } - log.Trace("registering with %+v", req) - _, err := env.registryConn.Request(&req) - if err != nil { - log.RaiseNC(alert, "could not register block services with %+v: %v", env.registryConn.RegistryAddress(), err) - time.Sleep(failureBackoff) - failureBackoff = min(failureBackoff*2, maxFailureBackoff) - continue - } - log.ClearNC(alert) - failureBackoff = 100 * time.Millisecond - registrationCount++ - var waitFor time.Duration - if registrationCount < 3 { - waitFor = 5*time.Second + time.Duration(mrand.Uint64()%uint64((5*time.Second).Nanoseconds())) - } else { - waitFor = minimumRegisterInterval + time.Duration(mrand.Uint64()%uint64(variantRegisterInterval.Nanoseconds())) - } - log.Info("registered with %v (%v alive), waiting %v", env.registryConn.RegistryAddress(), len(blockServices), waitFor) - time.Sleep(waitFor) - } -} - -func updateBlockServiceInfoBlocksForever( - log *log.Logger, - blockServices map[msgs.BlockServiceId]*blockService, -) { - for { - for _, bs := range blockServices { - if err := updateBlockServiceInfoBlocks(log, bs); err != nil { - bs.couldNotUpdateInfoBlocks = true - log.RaiseNC(&bs.couldNotUpdateInfoBlocksAlert, "could not count blocks for block service %v: %v", bs.cachedInfo.Id, err) - } else { - bs.couldNotUpdateInfoBlocks = false - log.ClearNC(&bs.couldNotUpdateInfoBlocksAlert) - } - } - time.Sleep(time.Minute) // so that we won't busy loop in tests etc - } -} - -func updateBlockServiceInfoCapacityForever( - log *log.Logger, - blockServices map[msgs.BlockServiceId]*blockService, - reservedStorage uint64, -) { - for { - for _, bs := range blockServices { - if err := updateBlockServiceInfoCapacity(log, bs, reservedStorage); err != nil { - bs.couldNotUpdateInfoCapacity = true - log.RaiseNC(&bs.couldNotUpdateInfoCapacityAlert, "could not get capacity for block service %v: %v", bs.cachedInfo.Id, err) - } else { - bs.couldNotUpdateInfoCapacity = false - log.ClearNC(&bs.couldNotUpdateInfoCapacityAlert) - } - } - time.Sleep(10 * time.Second) // so that we won't busy loop in tests etc - } -} - -func checkEraseCertificate(log *log.Logger, blockServiceId msgs.BlockServiceId, cipher cipher.Block, req *msgs.EraseBlockReq, stats *blockServiceStats) error { - expectedMac, good := certificate.CheckBlockEraseCertificate(blockServiceId, cipher, req) - if !good { - log.ErrorNoAlert("bad MAC, got %v, expected %v", req.Certificate, expectedMac) - atomic.AddUint64(&stats.badCertificate, 1) - return msgs.BAD_CERTIFICATE - } - return nil -} - -func eraseBlock(log *log.Logger, env *env, blockServiceId msgs.BlockServiceId, basePath string, blockId msgs.BlockId) error { - m := env.eraseLocks[blockServiceId] - m.Lock() - defer m.Unlock() - blockPath := path.Join(basePath, blockId.Path()) - log.Debug("deleting block %v at path %v", blockId, blockPath) - err := eraseFileIfExistsAndSyncDir(blockPath) - if err != nil && !os.IsNotExist(err) { - return err - } - atomic.AddUint64(&env.stats[blockServiceId].blocksErased, 1) - return nil -} - func writeBlocksResponse(log *log.Logger, w io.Writer, resp msgs.BlocksResponse) error { log.Trace("writing response %T %+v", resp, resp) buf := bytes.NewBuffer([]byte{}) @@ -413,421 +101,10 @@ func writeBlocksResponseError(log *log.Logger, w io.Writer, err msgs.TernError) return nil } -type newToOldReadConverter struct { - log *log.Logger - r io.Reader - b []byte - totalRead int - bytesInBuffer int -} - -func (c *newToOldReadConverter) Read(p []byte) (int, error) { - var read int = 0 - for len(p) > 0 { - c.log.Debug("{len_p: %d, read, %d, bytesInBuffer: %d, totalRead %d}", len(p), read, c.bytesInBuffer, c.totalRead) - readFromFile, err := c.r.Read(c.b[c.bytesInBuffer:]) - if err != nil && err != io.EOF { - return read, err - } - c.bytesInBuffer += readFromFile - if c.bytesInBuffer == 0 && err == io.EOF { - return read, io.EOF - } - offsetInBuffer := 0 - for offsetInBuffer < c.bytesInBuffer && len(p) > 0 { - toCopy := c.bytesInBuffer - offsetInBuffer - if toCopy > len(p) { - toCopy = len(p) - } - offSetInPage := c.totalRead % int(msgs.TERN_PAGE_WITH_CRC_SIZE) - availableInPage := int(msgs.TERN_PAGE_SIZE) - offSetInPage - if toCopy > availableInPage { - toCopy = availableInPage - } - c.log.Debug("{toCopy: %d, offSetInPage: %d, availableInPage: %d, offsetInBuffer: %d, bytesInBuffer: %d}", toCopy, offSetInPage, availableInPage, offsetInBuffer, c.bytesInBuffer) - copy(p, c.b[offsetInBuffer:offsetInBuffer+toCopy]) - c.totalRead += toCopy - read += toCopy - p = p[toCopy:] - offsetInBuffer += toCopy - offSetInPage += toCopy - if offSetInPage == int(msgs.TERN_PAGE_SIZE) { - if c.bytesInBuffer-offsetInBuffer < 4 { - break - } - c.totalRead += 4 - offsetInBuffer += 4 - } - } - copy(c.b, c.b[offsetInBuffer:c.bytesInBuffer]) - c.bytesInBuffer -= offsetInBuffer - } - c.log.Debug("{len_p: %d, read: %d", len(p), read) - return read, nil -} - -func sendFetchBlock(log *log.Logger, env *env, blockServiceId msgs.BlockServiceId, basePath string, blockId msgs.BlockId, offset uint32, count uint32, conn *net.TCPConn, withCrc bool, storageClass msgs.StorageClass) error { - if offset%msgs.TERN_PAGE_SIZE != 0 { - log.RaiseAlert("trying to read from offset other than page boundary") - return msgs.BLOCK_FETCH_OUT_OF_BOUNDS - } - if count%msgs.TERN_PAGE_SIZE != 0 { - log.RaiseAlert("trying to read count which is not a multiple of page size") - return msgs.BLOCK_FETCH_OUT_OF_BOUNDS - } - pageCount := count / msgs.TERN_PAGE_SIZE - offsetPageCount := offset / msgs.TERN_PAGE_SIZE - blockPath := path.Join(basePath, blockId.Path()) - log.Debug("fetching block id %v (%v -> %v) at path %v", blockId, offset, count, blockPath) - f, err := os.Open(blockPath) - - if errors.Is(err, syscall.ENODATA) { - // see - raiseAlertAndHardwareEvent(log, env.hostname, blockServiceId.String(), - fmt.Sprintf("could not open block %v, got ENODATA, this probably means that the block/disk is gone", blockPath)) - // return io error, downstream code will pick it up - return syscall.EIO - } - - if os.IsNotExist(err) { - log.ErrorNoAlert("could not find block to fetch at path %v. Request from client: %v", blockPath, conn.RemoteAddr()) - return msgs.BLOCK_NOT_FOUND - } - - if err != nil { - return err - } - defer func() { - if f != nil { - f.Close() - } - }() - fi, err := f.Stat() - if err != nil { - return err - } - filePageCount := uint32(fi.Size()) / msgs.TERN_PAGE_WITH_CRC_SIZE - if offsetPageCount+pageCount > filePageCount { - log.RaiseAlert("malformed request for block %v. requested read at [%d - %d] but stored block size is %d", blockId, offset, offset+count, filePageCount*msgs.TERN_PAGE_SIZE) - return msgs.BLOCK_FETCH_OUT_OF_BOUNDS - } - // Decide whether to read ahead based on storage class: always for HDD, never for FLASH. - // Read-ahead is crucial for HDD performance because sequential reads are much faster than random access, - // so reading the whole file amortizes seek time. For FLASH/SSD, random access is fast enough that - // read-ahead provides no benefit and just wastes I/O bandwidth and memory. - readAhead := storageClass == msgs.HDD_STORAGE - var reader io.ReadSeeker = f - - if withCrc { - offset = offsetPageCount * msgs.TERN_PAGE_WITH_CRC_SIZE - count = pageCount * msgs.TERN_PAGE_WITH_CRC_SIZE - // Only issue fadvise syscall if we're reading ahead - if readAhead { - unix.Fadvise(int(f.Fd()), int64(offset), fi.Size(), unix.FADV_SEQUENTIAL|unix.FADV_WILLNEED) - } - - if _, err := reader.Seek(int64(offset), 0); err != nil { - return err - } - var resp msgs.BlocksResponse = &msgs.FetchBlockResp{} - if withCrc { - resp = &msgs.FetchBlockWithCrcResp{} - } - - if err := writeBlocksResponse(log, conn, resp); err != nil { - return err - } - lf := io.LimitedReader{ - R: reader, - N: int64(count), - } - - read, err := conn.ReadFrom(&lf) - if err != nil { - return err - } - if read != int64(count) { - log.RaiseAlert("expected to read at least %v bytes, but only got %v for file %q", count, read, blockPath) - return msgs.INTERNAL_ERROR - } - } else { - // the only remaining case is that we have a file in new format and client wants old format - offset = offsetPageCount * msgs.TERN_PAGE_WITH_CRC_SIZE - // Only issue fadvise syscall if we're reading ahead - if readAhead { - unix.Fadvise(int(f.Fd()), int64(offset), fi.Size(), unix.FADV_SEQUENTIAL|unix.FADV_WILLNEED) - } - if _, err := reader.Seek(int64(offset), 0); err != nil { - return err - } - if err := writeBlocksResponse(log, conn, &msgs.FetchBlockResp{}); err != nil { - return err - } - buf := env.bufPool.Get(1 << 20) - defer env.bufPool.Put(buf) - converter := newToOldReadConverter{ - log: log, - r: reader, - b: buf.Bytes(), - totalRead: 0, - bytesInBuffer: 0, - } - lf := io.LimitedReader{ - R: &converter, - N: int64(count), - } - read, err := conn.ReadFrom(&lf) - if err != nil { - return err - } - if read != int64(count) { - log.RaiseAlert("expected to read at least %v bytes, but only got %v for file %q", count, read, blockPath) - return msgs.INTERNAL_ERROR - } - } - - s := env.stats[blockServiceId] - atomic.AddUint64(&s.blocksFetched, 1) - atomic.AddUint64(&s.bytesFetched, uint64(count)) - return nil -} - -func getPhysicalBlockSize(path string) (int, error) { - fs := syscall.Statfs_t{} - err := syscall.Statfs(path, &fs) - if err != nil { - return 0, err - } - return int(fs.Bsize), nil -} - -func checkBlock(log *log.Logger, env *env, blockServiceId msgs.BlockServiceId, basePath string, blockId msgs.BlockId, expectedSize uint32, crc msgs.Crc, conn *net.TCPConn) error { - blockPath := path.Join(basePath, blockId.Path()) - log.Debug("checking block id %v at path %v", blockId, blockPath) - - // we try to open no crc first as this file is deleted only after file with crc is created - // if it doesn't exist in both places then it really is deleted - f, err := os.Open(blockPath) - - if errors.Is(err, syscall.ENODATA) { - // see - raiseAlertAndHardwareEvent(log, env.failureDomain, blockServiceId.String(), - fmt.Sprintf("could not open block %v, got ENODATA, this probably means that the block/disk is gone", blockPath)) - // return io error, downstream code will pick it up - return syscall.EIO - } - if os.IsNotExist(err) { - return msgs.BLOCK_NOT_FOUND - } - if err != nil { - return err - } - defer f.Close() - fi, err := f.Stat() - if err != nil { - log.ErrorNoAlert("could not read file %v : %v", blockPath, err) - return err - } - s := env.stats[blockServiceId] - atomic.AddUint64(&s.blocksChecked, 1) - atomic.AddUint64(&s.bytesChecked, uint64(expectedSize)) - - if uint32(fi.Size())%msgs.TERN_PAGE_WITH_CRC_SIZE != 0 { - log.ErrorNoAlert("size %v for block %v, not multiple of TERN_PAGE_WITH_CRC_SIZE", uint32(fi.Size()), blockPath) - return msgs.BAD_BLOCK_CRC - } - actualDataSize := (uint32(fi.Size()) / msgs.TERN_PAGE_WITH_CRC_SIZE) * msgs.TERN_PAGE_SIZE - if actualDataSize != expectedSize { - log.ErrorNoAlert("size %v for block %v, not equal to expected size %v", actualDataSize, blockPath, expectedSize) - return msgs.BAD_BLOCK_CRC - } - bufPtr := env.bufPool.Get(1 << 20) - defer env.bufPool.Put(bufPtr) - err = verifyCrcReader(log, bufPtr.Bytes(), f, crc) - - if errors.Is(err, syscall.ENODATA) { - // see - raiseAlertAndHardwareEvent(log, env.failureDomain, blockServiceId.String(), - fmt.Sprintf("could not open block %v, got ENODATA, this probably means that the block/disk is gone", blockPath)) - // return io error, downstream code will pick it up - return syscall.EIO - } - if os.IsNotExist(err) { - return msgs.BLOCK_NOT_FOUND - } - if err != nil { - return err - } - if err := writeBlocksResponse(log, conn, &msgs.CheckBlockResp{}); err != nil { - return err - } - return nil -} - -func checkWriteCertificate(log *log.Logger, cipher cipher.Block, blockServiceId msgs.BlockServiceId, req *msgs.WriteBlockReq, stats *blockServiceStats) error { - expectedMac, good := certificate.CheckBlockWriteCertificate(cipher, blockServiceId, req) - if !good { - log.ErrorNoAlert("bad MAC computed for %v %v %v %v, expected %v, got %v ", blockServiceId, req.BlockId, req.Crc, req.Size, expectedMac, req.Certificate) - atomic.AddUint64(&stats.badCertificate, 1) - return msgs.BAD_CERTIFICATE - } - return nil -} - -func writeToBuf(log *log.Logger, env *env, reader io.Reader, size int64) (*bufpool.Buf, error) { - readBufPtr := env.bufPool.Get(1 << 20) - defer env.bufPool.Put(readBufPtr) - readBuffer := readBufPtr.Bytes() - var err error - - writeButPtr := env.bufPool.Get(int(size / int64(msgs.TERN_PAGE_SIZE) * int64(msgs.TERN_PAGE_WITH_CRC_SIZE))) - writeBuffer := writeButPtr.Bytes() - defer func() { - if err != nil { - env.bufPool.Put(writeButPtr) - } - }() - readerHasMoreData := true - dataInReadBuffer := 0 - dataInWriteBuffer := 0 - endReadOffset := len(readBuffer) - for readerHasMoreData && size > int64(dataInReadBuffer) { - if int64(endReadOffset) > size { - endReadOffset = int(size) - } - - read, err := reader.Read(readBuffer[dataInReadBuffer:endReadOffset]) - if err != nil { - if err == io.EOF { - readerHasMoreData = false - err = nil - } else { - return nil, err - } - } - dataInReadBuffer += read - if dataInReadBuffer < int(msgs.TERN_PAGE_SIZE) { - continue - } - availablePages := dataInReadBuffer / int(msgs.TERN_PAGE_SIZE) - for i := 0; i < availablePages; i++ { - page := readBuffer[i*int(msgs.TERN_PAGE_SIZE) : (i+1)*int(msgs.TERN_PAGE_SIZE)] - dataInWriteBuffer += copy(writeBuffer[dataInWriteBuffer:], page) - pageCRC := crc32c.Sum(0, page) - binary.LittleEndian.PutUint32(writeBuffer[dataInWriteBuffer:dataInWriteBuffer+4], pageCRC) - dataInWriteBuffer += 4 - } - size -= int64(availablePages) * int64(msgs.TERN_PAGE_SIZE) - dataInReadBuffer = copy(readBuffer[:], readBuffer[availablePages*int(msgs.TERN_PAGE_SIZE):dataInReadBuffer]) - } - if !readerHasMoreData && (size-int64(dataInReadBuffer) > 0) { - log.Debug("failed converting block, reached EOF in input stream, missing %d bytes", size-int64(dataInReadBuffer)) - err = io.EOF - return nil, err - } - if dataInReadBuffer != 0 || size != 0 { - log.Debug("failed converting block, unexpected data size. left in read buffer %d, remaining size %d", dataInReadBuffer, size) - err = msgs.BAD_BLOCK_CRC - return nil, err - } - if dataInWriteBuffer != len(writeBuffer) { - log.Debug("failed converting block, unexpected write buffer size. expected %d, got %d", len(writeBuffer), dataInWriteBuffer) - err = msgs.BAD_BLOCK_CRC - return nil, err - } - return writeButPtr, nil -} - -func writeBlockInternal( - log *log.Logger, - env *env, - reader io.LimitedReader, - blockServiceId msgs.BlockServiceId, - expectedCrc msgs.Crc, - blockId msgs.BlockId, - filePath string, -) error { - // We don't check CRC here, we fully check tmpFile after it has been written and synced - bufPtr, err := writeToBuf(log, env, reader.R, reader.N) - if err != nil { - if err == msgs.BAD_BLOCK_CRC { - atomic.AddUint64(&env.stats[blockServiceId].badBlockCrc, 1) - } - return err - } - defer env.bufPool.Put(bufPtr) - - tmpFile, err := writeBufToTemp(&env.stats[blockServiceId].bytesWritten, path.Dir(filePath), bufPtr.Bytes()) - if err != nil { - return err - } - defer os.Remove(tmpFile) - readBufPtr := env.bufPool.Get(1 << 20) - defer env.bufPool.Put(readBufPtr) - err = verifyCrcFile(log, readBufPtr.Bytes(), tmpFile, int64(len(bufPtr.Bytes())), expectedCrc) - if err != nil { - log.ErrorNoAlert("failed writing block %v in blockservice %v with error : %v", blockId, blockServiceId, err) - if err == msgs.BAD_BLOCK_CRC { - atomic.AddUint64(&env.stats[blockServiceId].badBlockCrc, 1) - } - return err - } - err = moveFileAndSyncDir(tmpFile, filePath) - if err != nil { - log.ErrorNoAlert("failed writing block %v in blockservice %v with error : %v", blockId, blockServiceId, err) - return err - } - return nil -} - -func writeBlock( - log *log.Logger, - env *env, - blockServiceId msgs.BlockServiceId, cipher cipher.Block, basePath string, - blockId msgs.BlockId, expectedCrc msgs.Crc, size uint32, conn *net.TCPConn, -) error { - filePath := path.Join(basePath, blockId.Path()) - log.Debug("writing block %v at path %v", blockId, basePath) - - err := writeBlockInternal(log, env, io.LimitedReader{R: conn, N: int64(size)}, blockServiceId, expectedCrc, blockId, filePath) - if err != nil { - return err - } - - log.Debug("writing proof") - if err := writeBlocksResponse(log, conn, &msgs.WriteBlockResp{Proof: BlockWriteProof(blockServiceId, blockId, cipher)}); err != nil { - return err - } - atomic.AddUint64(&env.stats[blockServiceId].blocksWritten, 1) - return nil -} - -func testWrite( - log *log.Logger, env *env, blockServiceId msgs.BlockServiceId, basePath string, size uint64, conn *net.TCPConn, -) error { - filePath := path.Join(basePath, fmt.Sprintf("tmp.test-write%d", rand.Int63())) - defer os.Remove(filePath) - err := writeBlockInternal(log, env, io.LimitedReader{R: conn, N: int64(size)}, blockServiceId, 0, msgs.BlockId(0), filePath) - if err != nil { - return err - } - - if err := writeBlocksResponse(log, conn, &msgs.TestWriteResp{}); err != nil { - return err - } - return nil -} - -const PAST_CUTOFF time.Duration = 22 * time.Hour -const DEFAULT_FUTURE_CUTOFF time.Duration = 1 * time.Hour -const WRITE_FUTURE_CUTOFF time.Duration = 5 * time.Minute - -const MAX_OBJECT_SIZE uint32 = 100 << 20 - -// The bool is whether we should keep going +// handleRequestError decides whether to keep the connection alive after an error. func handleRequestError( log *log.Logger, - blockServices map[msgs.BlockServiceId]*blockService, + blockServices map[msgs.BlockServiceId]*blockServiceEntry, deadBlockServices map[msgs.BlockServiceId]deadBlockService, conn *net.TCPConn, lastError *error, @@ -869,13 +146,12 @@ func handleRequestError( } if errors.Is(err, syscall.EIO) && blockServiceId != 0 { - blockService := blockServices[blockServiceId] - if blockService.couldNotUpdateInfoBlocks || blockService.couldNotUpdateInfoCapacity { + entry := blockServices[blockServiceId] + if entry != nil && entry.ToDecommission() { err = msgs.BLOCK_IO_ERROR_DEVICE } else { err = msgs.BLOCK_IO_ERROR_FILE } - atomic.AddUint64(&blockService.ioErrors, 1) log.ErrorNoAlert("got unxpected IO error %v from %v for req kind %v, block service %v, will return %v, previous error: %v", err, conn.RemoteAddr(), req, blockServiceId, err, *lastError) writeBlocksResponseError(log, conn, err.(msgs.TernError)) return false @@ -909,8 +185,7 @@ func handleRequestError( return false } // Keep the connection around using a whitelist of conditions where we know - // that the stream is safe. Right now I just added one case which I know - // is safe, we can add others conservatively in the future if we wish to. + // that the stream is safe. safeError := false safeError = safeError || ((req == msgs.CHECK_BLOCK || req == msgs.FETCH_BLOCK || req == msgs.FETCH_BLOCK_WITH_CRC) && ternErr == msgs.BLOCK_NOT_FOUND) if safeError { @@ -928,8 +203,6 @@ func handleRequestError( } } -type deadBlockService struct{} - func readBlocksRequest( log *log.Logger, r io.Reader, @@ -975,16 +248,26 @@ func readBlocksRequest( return msgs.BlockServiceId(blockServiceId), req, nil } -// The bool tells us whether we should keep going +func classifyError(err error, entry *blockServiceEntry) { + switch err { + case msgs.BAD_BLOCK_CRC: + atomic.AddUint64(&entry.badBlockCrc, 1) + case msgs.BLOCK_TOO_OLD_FOR_WRITE: + atomic.AddUint64(&entry.blockTooOldForWrite, 1) + case msgs.BAD_CERTIFICATE: + atomic.AddUint64(&entry.badCertificate, 1) + } +} + +// handleSingleRequest processes one request and returns whether to keep the connection. func handleSingleRequest( log *log.Logger, env *env, _ chan any, lastError *error, - blockServices map[msgs.BlockServiceId]*blockService, + blockServices map[msgs.BlockServiceId]*blockServiceEntry, deadBlockServices map[msgs.BlockServiceId]deadBlockService, conn *net.TCPConn, - futureCutoff time.Duration, connectionTimeout time.Duration, ) bool { if connectionTimeout != 0 { @@ -1003,88 +286,99 @@ func handleSingleRequest( log.Trace("req %+v", req) defer log.Debug("serviced request of type %T from %v", req, conn.RemoteAddr()) if connectionTimeout != 0 { - // Reset timeout, with default settings this will give - // the request a minute to complete, given that the max - // block size is 10MiB, that is ~0.17MiB/s, so it should - // be plenty of time unless something is wrong. - // - // If we didn't reset this (or just remove the timeout) - // the previous timeout might very well trip the request - // because it might have been almost expired. conn.SetDeadline(time.Now().Add(connectionTimeout)) } - blockService, found := blockServices[blockServiceId] + entry, found := blockServices[blockServiceId] if !found { - // In general, refuse to service requests for block services that we - // don't have. return handleRequestError(log, blockServices, deadBlockServices, conn, lastError, blockServiceId, kind, msgs.BLOCK_SERVICE_NOT_FOUND) } - atomic.AddUint64(&blockService.requests, 1) + atomic.AddUint64(&entry.requests, 1) + bs := entry.BlockService + switch whichReq := req.(type) { case *msgs.EraseBlockReq: - if err := checkEraseCertificate(log, blockServiceId, blockService.cipher, whichReq, env.stats[blockServiceId]); err != nil { + proof, err := bs.EraseBlock(whichReq.BlockId, whichReq.Certificate) + if err != nil { + classifyError(err, entry) return handleRequestError(log, blockServices, deadBlockServices, conn, lastError, blockServiceId, kind, err) } - cutoffTime := msgs.TernTime(uint64(whichReq.BlockId)).Time().Add(futureCutoff) - now := time.Now() - if now.Before(cutoffTime) { - log.ErrorNoAlert("block %v is too recent to be deleted (now=%v, cutoffTime=%v)", whichReq.BlockId, now, cutoffTime) - return handleRequestError(log, blockServices, deadBlockServices, conn, lastError, blockServiceId, kind, msgs.BLOCK_TOO_RECENT_FOR_DELETION) - } - if err := eraseBlock(log, env, blockServiceId, blockService.path, whichReq.BlockId); err != nil { + if err := writeBlocksResponse(log, conn, &msgs.EraseBlockResp{Proof: proof}); err != nil { + log.Info("could not send blocks response to %v: %v", conn.RemoteAddr(), err) return handleRequestError(log, blockServices, deadBlockServices, conn, lastError, blockServiceId, kind, err) } - resp := msgs.EraseBlockResp{ - Proof: certificate.BlockEraseProof(blockServiceId, whichReq.BlockId, blockService.cipher), - } - if err := writeBlocksResponse(log, conn, &resp); err != nil { - log.Info("could not send blocks response to %v: %v", conn.RemoteAddr(), err) + case *msgs.FetchBlockWithCrcReq: + readAhead := bs.StorageClass == msgs.HDD_STORAGE + f, byteCount, err := bs.GetBlockFileForFetch(whichReq.BlockId, whichReq.Offset, whichReq.Count, readAhead) + if err != nil { return handleRequestError(log, blockServices, deadBlockServices, conn, lastError, blockServiceId, kind, err) } - case *msgs.FetchBlockReq: - if err := sendFetchBlock(log, env, blockServiceId, blockService.path, whichReq.BlockId, whichReq.Offset, whichReq.Count, conn, false, blockService.storageClass); err != nil { - log.Info("could not send block response to %v: %v", conn.RemoteAddr(), err) + defer func() { + f.Close() + bs.ReleaseBlockFile() + }() + if err := writeBlocksResponse(log, conn, &msgs.FetchBlockWithCrcResp{}); err != nil { return handleRequestError(log, blockServices, deadBlockServices, conn, lastError, blockServiceId, kind, err) } - case *msgs.FetchBlockWithCrcReq: - if err := sendFetchBlock(log, env, blockServiceId, blockService.path, whichReq.BlockId, whichReq.Offset, whichReq.Count, conn, true, blockService.storageClass); err != nil { - log.Info("could not send block response to %v: %v", conn.RemoteAddr(), err) + lf := io.LimitedReader{R: f, N: byteCount} + read, err := conn.ReadFrom(&lf) + if err != nil { return handleRequestError(log, blockServices, deadBlockServices, conn, lastError, blockServiceId, kind, err) } - case *msgs.WriteBlockReq: - pastCutoffTime := msgs.TernTime(uint64(whichReq.BlockId)).Time().Add(-PAST_CUTOFF) - futureCutoffTime := msgs.TernTime(uint64(whichReq.BlockId)).Time().Add(WRITE_FUTURE_CUTOFF) - now := time.Now() - if now.Before(pastCutoffTime) { - panic(fmt.Errorf("block %v is in the future! (now=%v, pastCutoffTime=%v)", whichReq.BlockId, now, pastCutoffTime)) + if read != byteCount { + log.RaiseAlert("expected to read at least %v bytes, but only got %v for block %v", byteCount, read, whichReq.BlockId) + return handleRequestError(log, blockServices, deadBlockServices, conn, lastError, blockServiceId, kind, msgs.INTERNAL_ERROR) } - if now.After(futureCutoffTime) { - log.ErrorNoAlert("block %v is too old to be written (now=%v, futureCutoffTime=%v)", whichReq.BlockId, now, futureCutoffTime) - atomic.AddUint64(&env.stats[blockServiceId].blockTooOldForWrite, 1) - return handleRequestError(log, blockServices, deadBlockServices, conn, lastError, blockServiceId, kind, msgs.BLOCK_TOO_OLD_FOR_WRITE) + + case *msgs.FetchBlockReq: + readAhead := bs.StorageClass == msgs.HDD_STORAGE + reader, err := bs.GetBlockReader(whichReq.BlockId, whichReq.Offset, whichReq.Count, readAhead, true) + if err != nil { + return handleRequestError(log, blockServices, deadBlockServices, conn, lastError, blockServiceId, kind, err) } - if err := checkWriteCertificate(log, blockService.cipher, blockServiceId, whichReq, env.stats[blockServiceId]); err != nil { + defer reader.Close() + if err := writeBlocksResponse(log, conn, &msgs.FetchBlockResp{}); err != nil { return handleRequestError(log, blockServices, deadBlockServices, conn, lastError, blockServiceId, kind, err) } - if whichReq.Size > MAX_OBJECT_SIZE { - log.RaiseAlert("block %v exceeds max object size: %v > %v", whichReq.BlockId, whichReq.Size, MAX_OBJECT_SIZE) - return handleRequestError(log, blockServices, deadBlockServices, conn, lastError, blockServiceId, kind, msgs.BLOCK_TOO_BIG) + lf := io.LimitedReader{R: reader, N: int64(whichReq.Count)} + read, err := conn.ReadFrom(&lf) + if err != nil { + return handleRequestError(log, blockServices, deadBlockServices, conn, lastError, blockServiceId, kind, err) } - if err := writeBlock(log, env, blockServiceId, blockService.cipher, blockService.path, whichReq.BlockId, whichReq.Crc, whichReq.Size, conn); err != nil { + if read != int64(whichReq.Count) { + log.RaiseAlert("expected to read at least %v bytes, but only got %v for block %v", whichReq.Count, read, whichReq.BlockId) + return handleRequestError(log, blockServices, deadBlockServices, conn, lastError, blockServiceId, kind, msgs.INTERNAL_ERROR) + } + + case *msgs.WriteBlockReq: + proof, err := bs.WriteBlock(whichReq.BlockId, whichReq.Certificate, whichReq.Crc, whichReq.Size, conn) + if err != nil { + classifyError(err, entry) log.Info("could not write block: %v", err) return handleRequestError(log, blockServices, deadBlockServices, conn, lastError, blockServiceId, kind, err) } + if err := writeBlocksResponse(log, conn, &msgs.WriteBlockResp{Proof: proof}); err != nil { + return handleRequestError(log, blockServices, deadBlockServices, conn, lastError, blockServiceId, kind, err) + } + case *msgs.CheckBlockReq: - if err := checkBlock(log, env, blockServiceId, blockService.path, whichReq.BlockId, whichReq.Size, whichReq.Crc, conn); err != nil { + if err := bs.CheckBlock(whichReq.BlockId, whichReq.Size, whichReq.Crc); err != nil { log.Info("checking block failed, conn %v, err %v", conn.RemoteAddr(), err) return handleRequestError(log, blockServices, deadBlockServices, conn, lastError, blockServiceId, kind, err) } + if err := writeBlocksResponse(log, conn, &msgs.CheckBlockResp{}); err != nil { + return handleRequestError(log, blockServices, deadBlockServices, conn, lastError, blockServiceId, kind, err) + } + case *msgs.TestWriteReq: - if err := testWrite(log, env, blockServiceId, blockService.path, whichReq.Size, conn); err != nil { + if err := bs.TestWrite(uint32(whichReq.Size), conn); err != nil { log.Info("could not perform test write: %v", err) return handleRequestError(log, blockServices, deadBlockServices, conn, lastError, blockServiceId, kind, err) } + if err := writeBlocksResponse(log, conn, &msgs.TestWriteResp{}); err != nil { + return handleRequestError(log, blockServices, deadBlockServices, conn, lastError, blockServiceId, kind, err) + } + default: return handleRequestError(log, blockServices, deadBlockServices, conn, lastError, blockServiceId, kind, fmt.Errorf("bad request type %T", req)) } @@ -1095,84 +389,95 @@ func handleRequest( log *log.Logger, env *env, terminateChan chan any, - blockServices map[msgs.BlockServiceId]*blockService, + blockServices map[msgs.BlockServiceId]*blockServiceEntry, deadBlockServices map[msgs.BlockServiceId]deadBlockService, conn *net.TCPConn, - futureCutoff time.Duration, connectionTimeout time.Duration, ) { defer conn.Close() var lastError error - for { - keepGoing := handleSingleRequest(log, env, terminateChan, &lastError, blockServices, deadBlockServices, conn, futureCutoff, connectionTimeout) - if !keepGoing { - return - } - } -} - -func usage() { - fmt.Fprintf(os.Stderr, "Usage: %v DIRECTORY STORAGE_CLASS...\n\n", os.Args[0]) - description := ` -For each directory/storage class pair specified we'll have one block -service. The block service id for each will be automatically generated -when running for the first time. The failure domain will be the same. - -The intention is that a single blockservice process will service -a storage node. - -Options:` - description = strings.TrimSpace(description) - fmt.Fprintln(os.Stderr, description) - flag.PrintDefaults() -} - -func retrieveOrCreateKey(log *log.Logger, dir string) ([16]byte, error) { - var err error - var keyFile *os.File - keyFilePath := path.Join(dir, "secret.key") - keyFile, err = os.OpenFile(keyFilePath, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) - if err != nil { - return [16]byte{}, fmt.Errorf("could not open or create key file %v: %v", keyFilePath, err) - } - keyFile.Seek(0, 0) - if err := syscall.Flock(int(keyFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB); err != nil { - return [16]byte{}, fmt.Errorf("could not lock key file %v: %v", keyFilePath, err) - } - var key [16]byte - var read int - read, err = keyFile.Read(key[:]) - if err != nil && err != io.EOF { - return [16]byte{}, fmt.Errorf("could not read key file %v: %v", keyFilePath, err) - } - if err == io.EOF { - log.Info("creating new secret key") - if _, err := crand.Read(key[:]); err != nil { - panic(err) + for { + keepGoing := handleSingleRequest(log, env, terminateChan, &lastError, blockServices, deadBlockServices, conn, connectionTimeout) + if !keepGoing { + return } - if _, err := keyFile.Write(key[:]); err != nil { - panic(err) + } +} + +var minimumRegisterInterval time.Duration = time.Second * 60 +var maximumRegisterInterval time.Duration = minimumRegisterInterval * 2 +var variantRegisterInterval time.Duration = maximumRegisterInterval - minimumRegisterInterval + +func registerPeriodically( + log *log.Logger, + blockServices map[msgs.BlockServiceId]*blockServiceEntry, + env *env, +) { + req := msgs.RegisterBlockServicesReq{} + alert := log.NewNCAlert(10 * time.Second) + failureBackoff := 100 * time.Millisecond + const maxFailureBackoff = 60 * time.Second + registrationCount := 0 + for { + req.BlockServices = req.BlockServices[:0] + for _, entry := range blockServices { + if entry.ToDecommission() || entry.decommissioned { + continue + } + req.BlockServices = append(req.BlockServices, entry.RegisterBlockServiceInfo) } - keyCrc := crc32c.Sum(0, key[:]) - if err := binary.Write(keyFile, binary.LittleEndian, keyCrc); err != nil { - panic(err) + log.Trace("registering with %+v", req) + _, err := env.registryConn.Request(&req) + if err != nil { + log.RaiseNC(alert, "could not register block services with %+v: %v", env.registryConn.RegistryAddress(), err) + time.Sleep(failureBackoff) + failureBackoff = min(failureBackoff*2, maxFailureBackoff) + continue } - log.Info("creating directory structure") - } else if read != 16 { - return [16]byte{}, fmt.Errorf("short secret key (%v rather than 16 bytes)", read) - } else { - expectedKeyCrc := crc32c.Sum(0, key[:]) - var actualKeyCrc uint32 - if err := binary.Read(keyFile, binary.LittleEndian, &actualKeyCrc); err != nil { - return [16]byte{}, fmt.Errorf("could not read crc from key file %v: %v", keyFilePath, err) + log.ClearNC(alert) + failureBackoff = 100 * time.Millisecond + registrationCount++ + var waitFor time.Duration + if registrationCount < 3 { + waitFor = 5*time.Second + time.Duration(mrand.Uint64()%uint64((5*time.Second).Nanoseconds())) + } else { + waitFor = minimumRegisterInterval + time.Duration(mrand.Uint64()%uint64(variantRegisterInterval.Nanoseconds())) } - if expectedKeyCrc != actualKeyCrc { - return [16]byte{}, fmt.Errorf("expected crc %v, got %v", msgs.Crc(expectedKeyCrc), msgs.Crc(actualKeyCrc)) + log.Info("registered with %v (%v alive), waiting %v", env.registryConn.RegistryAddress(), len(blockServices), waitFor) + time.Sleep(waitFor) + } +} + +func raiseAlerts(log *log.Logger, env *env, blockServices map[msgs.BlockServiceId]*blockServiceEntry) { + for { + for bsId, entry := range blockServices { + if entry.decommissioned { + continue + } + ioErrors := entry.lastIoErrors + requests := entry.lastRequests + entry.lastIoErrors = atomic.LoadUint64(&entry.IoErrors) + entry.lastRequests = atomic.LoadUint64(&entry.requests) + ioErrors = entry.lastIoErrors - ioErrors + requests = entry.lastRequests - requests + if requests*uint64(env.ioAlertPercent) < ioErrors*100 { + log.Info("block service %v had %v ioErrors from %v requests in the last 5 minutes (over %d%% threshold), requesting decommission", bsId, ioErrors, requests, env.ioAlertPercent) + _, err := env.registryConn.Request(&msgs.DecommissionBlockServiceReq{Id: entry.Id}) + if err != nil { + log.RaiseNC(&entry.ioErrorsAlert, "block service %v had %v ioErrors from %v requests in the last 5 minutes (over %d%% threshold), decommission failed: %v", bsId, ioErrors, requests, env.ioAlertPercent, err) + } else { + entry.decommissioned = true + log.ClearNC(&entry.ioErrorsAlert) + log.Info("block service %v decommissioned successfully", entry.Id) + } + } else { + log.ClearNC(&entry.ioErrorsAlert) + } } + time.Sleep(5 * time.Minute) } - return key, nil } type diskStats struct { @@ -1198,7 +503,6 @@ func getDiskStats(log *log.Logger, statsPath string) (map[string]diskStats, erro continue } devId := fmt.Sprintf("%s:%s", fields[0], fields[1]) - // https://www.kernel.org/doc/html/v5.4/admin-guide/iostats.html readMs, err := strconv.ParseUint(fields[6], 10, 64) if err != nil { return nil, fmt.Errorf("failed to parse reads ms from %s", line) @@ -1221,37 +525,7 @@ func getDiskStats(log *log.Logger, statsPath string) (map[string]diskStats, erro return ret, nil } -func raiseAlerts(log *log.Logger, env *env, blockServices map[msgs.BlockServiceId]*blockService) { - for { - for bsId, bs := range blockServices { - if bs.decommissioned { - continue - } - ioErrors := bs.lastIoErrors - requests := bs.lastRequests - bs.lastIoErrors = atomic.LoadUint64(&bs.ioErrors) - bs.lastRequests = atomic.LoadUint64(&bs.requests) - ioErrors = bs.lastIoErrors - ioErrors - requests = bs.lastRequests - requests - if requests*uint64(env.ioAlertPercent) < ioErrors*100 { - log.Info("block service %v had %v ioErrors from %v requests in the last 5 minutes (over %d%% threshold), requesting decommission", bsId, ioErrors, requests, env.ioAlertPercent) - _, err := env.registryConn.Request(&msgs.DecommissionBlockServiceReq{Id: bs.cachedInfo.Id}) - if err != nil { - log.RaiseNC(&bs.ioErrorsAlert, "block service %v had %v ioErrors from %v requests in the last 5 minutes (over %d%% threshold), decommission failed: %v", bsId, ioErrors, requests, env.ioAlertPercent, err) - } else { - bs.decommissioned = true - log.ClearNC(&bs.ioErrorsAlert) - log.Info("block service %v decommissioned successfully", bs.cachedInfo.Id) - } - } else { - log.ClearNC(&bs.ioErrorsAlert) - } - } - time.Sleep(5 * time.Minute) - } -} - -func sendMetrics(l *log.Logger, env *env, influxDB *log.InfluxDB, blockServices map[msgs.BlockServiceId]*blockService, failureDomain string) { +func sendMetrics(l *log.Logger, env *env, influxDB *log.InfluxDB, blockServices map[msgs.BlockServiceId]*blockServiceEntry, failureDomain string) { metrics := log.MetricsBuilder{} rand := wyhash.New(mrand.Uint64()) alert := l.NewNCAlert(10 * time.Second) @@ -1264,59 +538,57 @@ func sendMetrics(l *log.Logger, env *env, influxDB *log.InfluxDB, blockServices l.Info("sending metrics") metrics.Reset() now := time.Now() - for bsId, bsStats := range env.stats { + for bsId, entry := range blockServices { metrics.Measurement("eggsfs_blocks_write") metrics.Tag("blockservice", bsId.String()) metrics.Tag("failuredomain", failureDomainEscaped) metrics.Tag("pathprefix", env.pathPrefix) - metrics.FieldU64("bytes", bsStats.bytesWritten) - metrics.FieldU64("blocks", bsStats.blocksWritten) + metrics.FieldU64("bytes", atomic.LoadUint64(&entry.BytesWritten)) + metrics.FieldU64("blocks", atomic.LoadUint64(&entry.BlocksWritten)) metrics.Timestamp(now) metrics.Measurement("eggsfs_blocks_read") metrics.Tag("blockservice", bsId.String()) metrics.Tag("failuredomain", failureDomainEscaped) metrics.Tag("pathprefix", env.pathPrefix) - metrics.FieldU64("bytes", bsStats.bytesFetched) - metrics.FieldU64("blocks", bsStats.blocksFetched) + metrics.FieldU64("bytes", atomic.LoadUint64(&entry.BytesFetched)) + metrics.FieldU64("blocks", atomic.LoadUint64(&entry.BlocksFetched)) metrics.Timestamp(now) metrics.Measurement("eggsfs_blocks_erase") metrics.Tag("blockservice", bsId.String()) metrics.Tag("failuredomain", failureDomainEscaped) metrics.Tag("pathprefix", env.pathPrefix) - metrics.FieldU64("blocks", bsStats.blocksErased) + metrics.FieldU64("blocks", atomic.LoadUint64(&entry.BlocksErased)) metrics.Timestamp(now) metrics.Measurement("eggsfs_blocks_check") metrics.Tag("blockservice", bsId.String()) metrics.Tag("failuredomain", failureDomainEscaped) metrics.Tag("pathprefix", env.pathPrefix) - metrics.FieldU64("blocks", bsStats.blocksChecked) - metrics.FieldU64("bytes", bsStats.bytesChecked) + metrics.FieldU64("blocks", atomic.LoadUint64(&entry.BlocksChecked)) + metrics.FieldU64("bytes", atomic.LoadUint64(&entry.BytesChecked)) metrics.Timestamp(now) metrics.Measurement("eggsfs_blocks_errors") metrics.Tag("blockservice", bsId.String()) metrics.Tag("failuredomain", failureDomainEscaped) metrics.Tag("pathprefix", env.pathPrefix) - metrics.FieldU64("bad_block_crc", bsStats.badBlockCrc) - metrics.FieldU64("block_too_old", bsStats.blockTooOldForWrite) - metrics.FieldU64("bad_certificate", bsStats.badCertificate) + metrics.FieldU64("bad_block_crc", atomic.LoadUint64(&entry.badBlockCrc)) + metrics.FieldU64("block_too_old", atomic.LoadUint64(&entry.blockTooOldForWrite)) + metrics.FieldU64("bad_certificate", atomic.LoadUint64(&entry.badCertificate)) metrics.Timestamp(now) - } - for bsId, bsInfo := range blockServices { metrics.Measurement("eggsfs_blocks_storage") metrics.Tag("blockservice", bsId.String()) metrics.Tag("failuredomain", failureDomainEscaped) metrics.Tag("pathprefix", env.pathPrefix) - metrics.Tag("storageclass", bsInfo.storageClass.String()) - metrics.FieldU64("capacity", bsInfo.cachedInfo.CapacityBytes) - metrics.FieldU64("available", bsInfo.cachedInfo.AvailableBytes) - metrics.FieldU64("blocks", bsInfo.cachedInfo.Blocks) - metrics.FieldU64("io_errors", bsInfo.ioErrors) - dm, found := diskMetrics[bsInfo.devId] + metrics.Tag("storageclass", entry.StorageClass.String()) + metrics.FieldU64("capacity", atomic.LoadUint64(&entry.CapacityBytes)) + metrics.FieldU64("available", atomic.LoadUint64(&entry.AvailableBytes)) + metrics.FieldU64("blocks", atomic.LoadUint64(&entry.Blocks)) + metrics.FieldU64("io_errors", atomic.LoadUint64(&entry.IoErrors)) + dm, found := diskMetrics[entry.DevId] if found { metrics.FieldU64("read_ms", dm.readMs) metrics.FieldU64("write_ms", dm.writeMs) @@ -1337,25 +609,6 @@ func sendMetrics(l *log.Logger, env *env, influxDB *log.InfluxDB, blockServices } } -type blockService struct { - path string - devId string - key [16]byte - cipher cipher.Block - storageClass msgs.StorageClass - cachedInfo msgs.RegisterBlockServiceInfo - couldNotUpdateInfoBlocks bool - couldNotUpdateInfoBlocksAlert log.XmonNCAlert - couldNotUpdateInfoCapacity bool - couldNotUpdateInfoCapacityAlert log.XmonNCAlert - ioErrorsAlert log.XmonNCAlert - ioErrors uint64 - requests uint64 - lastIoErrors uint64 - lastRequests uint64 - decommissioned bool -} - func getMountsInfo(log *log.Logger, mountsPath string) (map[string]string, error) { file, err := os.Open(mountsPath) if err != nil { @@ -1373,19 +626,65 @@ func getMountsInfo(log *log.Logger, mountsPath string) (map[string]string, error continue } path := mountFields[4] - // should be major:minor of the mounted disk ret[path] = mountFields[2] } return ret, nil } +func readOrCreateKey(l *log.Logger, dir string) ([16]byte, error) { + keyPath := path.Join(dir, blockservice.SECRET_FILE_NAME) + f, err := os.Open(keyPath) + if os.IsNotExist(err) { + key := blockservice.CreateSecret(l, keyPath) + if key == nil { + return [16]byte{}, fmt.Errorf("failed creating secret at %s", keyPath) + } + return *key, nil + } + if err != nil { + return [16]byte{}, fmt.Errorf("could not open key file %v: %v", keyPath, err) + } + defer f.Close() + var key [16]byte + if _, err := io.ReadFull(f, key[:]); err != nil { + return [16]byte{}, fmt.Errorf("could not read key file %v: %v", keyPath, err) + } + expectedCrc := crc32c.Sum(0, key[:]) + var actualCrc uint32 + if err := binary.Read(f, binary.LittleEndian, &actualCrc); err != nil { + return [16]byte{}, fmt.Errorf("could not read crc from key file %v: %v", keyPath, err) + } + if expectedCrc != actualCrc { + return [16]byte{}, fmt.Errorf("crc mismatch in key file %v: expected %v, got %v", keyPath, expectedCrc, actualCrc) + } + return key, nil +} + +func usage() { + fmt.Fprintf(os.Stderr, "Usage: %v DIRECTORY STORAGE_CLASS...\n\n", os.Args[0]) + description := ` +For each directory/storage class pair specified we'll have one block +service. The block service id for each will be automatically generated +when running for the first time. The failure domain will be the same. + +The intention is that a single blockservice process will service +a storage node. + +Options:` + description = strings.TrimSpace(description) + fmt.Fprintln(os.Stderr, description) + flag.PrintDefaults() +} + +const PAST_CUTOFF time.Duration = 22 * time.Hour +const WRITE_FUTURE_CUTOFF time.Duration = 5 * time.Minute + func main() { flag.Usage = usage failureDomainStr := flag.String("failure-domain", "", "Failure domain") hostname := flag.String("hostname", "", "Hostname (for hardware event reporting)") pathPrefixStr := flag.String("path-prefix", "", "We filter our block service not only by failure domain but also by path prefix") - futureCutoff := flag.Duration("future-cutoff", DEFAULT_FUTURE_CUTOFF, "") var addresses flags.StringArrayFlags flag.Var(&addresses, "addr", "Addresses (up to two) to bind to, and that will be advertised to registry.") verbose := flag.Bool("verbose", false, "") @@ -1404,7 +703,10 @@ func main() { locationId := flag.Uint("location", 10000, "Location ID") ioAlertPercent := flag.Uint("io-alert-percent", 10, "Threshold percent of I/O errors over which we alert") registryConnectionTimeout := flag.Duration("registry-connection-timeout", 10*time.Second, "") + eraseCutoff := flag.Duration("erase-cutoff", time.Minute, "How old a block must be before it can be erased") dscp := flag.Uint("dscp", 0, "DSCP value to set on connections") + maxConcurrentWrites := flag.Int("max-concurrent-writes", 0, "Max concurrent writes per block service (0=unlimited)") + maxConcurrentReads := flag.Int("max-concurrent-reads", 0, "Max concurrent reads per block service (0=unlimited)") flag.Parse() flagErrors := false @@ -1545,7 +847,6 @@ func main() { pprof.StopCPUProfile() } defer stopCpuProfile() - // Save CPU profile if we get killed by a signal signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP, syscall.SIGABRT, syscall.SIGSTKFLT, syscall.SIGSYS) go func() { @@ -1560,7 +861,6 @@ func main() { l.Info(" locationId = %v", *locationId) l.Info(" failureDomain = %v", *failureDomainStr) l.Info(" pathPrefix = %v", *pathPrefixStr) - l.Info(" futureCutoff = %v", *futureCutoff) l.Info(" addr = '%v'", addresses) l.Info(" logLevel = %v", level) l.Info(" logFile = '%v'", *logFile) @@ -1572,20 +872,65 @@ func main() { bufPool := bufpool.NewBufPool() env := &env{ bufPool: bufPool, - stats: make(map[msgs.BlockServiceId]*blockServiceStats), - eraseLocks: make(map[msgs.BlockServiceId]*sync.Mutex), failureDomain: *failureDomainStr, pathPrefix: *pathPrefixStr, ioAlertPercent: uint8(*ioAlertPercent), registryConn: client.MakeRegistryConn(l, nil, *registryAddress, 1), } + // Start TCP listeners first to get actual ports + lc := net.ListenConfig{ + Control: func(network, address string, c syscall.RawConn) error { + return c.Control(func(fd uintptr) { + syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1) + }) + }, + } + + listener1, err := lc.Listen(context.Background(), "tcp4", fmt.Sprintf("%v:%v", net.IP(ownIp1[:]), port1)) + if err != nil { + panic(err) + } + defer listener1.Close() + + l.Info("running 1 on %v", listener1.Addr()) + actualPort1 := uint16(listener1.Addr().(*net.TCPAddr).Port) + + var listener2 net.Listener + var actualPort2 uint16 + if len(addresses) == 2 { + listener2, err = lc.Listen(context.Background(), "tcp4", fmt.Sprintf("%v:%v", net.IP(ownIp2[:]), port2)) + if err != nil { + panic(err) + } + defer listener2.Close() + + l.Info("running 2 on %v", listener2.Addr()) + actualPort2 = uint16(listener2.Addr().(*net.TCPAddr).Port) + } + + addrs := msgs.AddrsInfo{ + Addr1: msgs.IpPort{Addrs: ownIp1, Port: actualPort1}, + Addr2: msgs.IpPort{Addrs: ownIp2, Port: actualPort2}, + } + mountsInfo, err := getMountsInfo(l, "/proc/self/mountinfo") if err != nil { l.RaiseAlert("Disk stats for mounted paths will not be collected due to failure collecting mount info: %v", err) } - blockServices := make(map[msgs.BlockServiceId]*blockService) + bsOptions := &blockservice.BlockServiceOptions{ + BufferSize: 1 << 20, + MaxConcurrentWrites: *maxConcurrentWrites, + MaxConcurrentReads: *maxConcurrentReads, + FutureCutoff: WRITE_FUTURE_CUTOFF, + PastCutoff: PAST_CUTOFF, + EraseCutoff: *eraseCutoff, + ReservedCapacityBytes: *reservedStorage, + } + + // Create block services + blockServices := make(map[msgs.BlockServiceId]*blockServiceEntry) var failedBlockServiceCount int for i := 0; i < flag.NArg(); i += 2 { dir := flag.Args()[i] @@ -1594,40 +939,56 @@ func main() { fmt.Fprintf(os.Stderr, "Storage class cannot be EMPTY/INLINE") os.Exit(2) } - key, err := retrieveOrCreateKey(l, dir) + key, err := readOrCreateKey(l, dir) if err != nil { l.Info("%v", err) failedBlockServiceCount++ continue } - id := blockServiceIdFromKey(key) - cipher, err := aes.NewCipher(key[:]) - if err != nil { - panic(fmt.Errorf("could not create AES-128 key: %w", err)) - } + id := blockservice.BlockServiceIdFromKey(key) devId, found := mountsInfo[dir] if !found { devId = "" } - blockServices[id] = &blockService{ - path: dir, - devId: devId, - key: key, - cipher: cipher, - storageClass: storageClass, - couldNotUpdateInfoBlocksAlert: *l.NewNCAlert(time.Second), - couldNotUpdateInfoCapacityAlert: *l.NewNCAlert(time.Second), - ioErrorsAlert: *l.NewNCAlert(time.Second), + bsInfo := msgs.RegisterBlockServiceInfo{ + Id: id, + LocationId: msgs.Location(*locationId), + Addrs: addrs, + StorageClass: storageClass, + SecretKey: key, + Path: dir, + } + if len(env.pathPrefix) > 0 { + bsInfo.Path = fmt.Sprintf("%s:%s", env.pathPrefix, dir) + } + bsInfo.FailureDomain.Name = failureDomain + + bs := blockservice.OpenBlockService(l, bsOptions, bufPool, bsInfo, devId) + blockServices[id] = &blockServiceEntry{ + BlockService: bs, + ioErrorsAlert: *l.NewNCAlert(time.Second), } } - for id, blockService := range blockServices { - l.Info("block service %v at %v, storage class %v", id, blockService.path, blockService.storageClass) + for id, entry := range blockServices { + l.Info("block service %v at %v, storage class %v", id, entry.LocalPath(), entry.StorageClass) } if len(blockServices)+failedBlockServiceCount != flag.NArg()/2 { panic(fmt.Errorf("duplicate block services")) } + // Wait for all block services to become active + for id, entry := range blockServices { + deadline := time.Now().Add(2 * time.Minute) + for !entry.Active() { + if time.Now().After(deadline) { + panic(fmt.Errorf("timed out waiting for block service %v to become active", id)) + } + time.Sleep(100 * time.Millisecond) + } + } + l.Info("all block services active") + // Now ask registry for block services we _had_ before. We need to know this to honor // erase block requests for old block services safely. deadBlockServices := make(map[msgs.BlockServiceId]deadBlockService) @@ -1650,7 +1011,7 @@ func main() { } for i := range registryBlockServices { bs := ®istryBlockServices[i] - ourBs, weHaveBs := blockServices[bs.Id] + _, weHaveBs := blockServices[bs.Id] sameFailureDomain := bs.FailureDomain.Name == failureDomain if len(env.pathPrefix) > 0 { pathParts := strings.Split(bs.Path, ":") @@ -1659,82 +1020,27 @@ func main() { } } isDecommissioned := (bs.Flags & msgs.TERNFS_BLOCK_SERVICE_DECOMMISSIONED) != 0 - // No disagreement on failure domain with registry (otherwise we could end up with - // a split brain scenario where two eggsblocks processes assume control of two dead - // block services) if weHaveBs && !sameFailureDomain { panic(fmt.Errorf("we have block service %v, and we're failure domain %v, but registry thinks it should be failure domain %v. If you've moved this block service, change the failure domain on registry", bs.Id, failureDomain, bs.FailureDomain)) } - // block services in the same failure domain, which we do not have, must be - // decommissioned if !weHaveBs && sameFailureDomain { if !isDecommissioned { panic(fmt.Errorf("registry has block service %v for our failure domain %v, but we don't have this block service, and it is not decommissioned. If the block service is dead, mark it as decommissioned", bs.Id, failureDomain)) } deadBlockServices[bs.Id] = deadBlockService{} } - // we can't have a decommissioned block service if weHaveBs && isDecommissioned { l.ErrorNoAlert("We have block service %v, which is decommissioned according to registry. We will treat it as if it doesn't exist.", bs.Id) + entry := blockServices[bs.Id] + entry.Close() delete(blockServices, bs.Id) deadBlockServices[bs.Id] = deadBlockService{} } - // fill in information from registry, if it's recent enough - if weHaveBs && time.Since(bs.LastSeen.Time()) < maximumRegisterInterval*2 { - // everything else is filled in by initBlockServicesInfo - ourBs.cachedInfo = msgs.RegisterBlockServiceInfo{ - CapacityBytes: bs.CapacityBytes, - AvailableBytes: bs.AvailableBytes, - Blocks: bs.Blocks, - } - } - } - } - - lc := net.ListenConfig{ - Control: func(network, address string, c syscall.RawConn) error { - return c.Control(func(fd uintptr) { - syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1) - }) - }, - } - - listener1, err := lc.Listen(context.Background(), "tcp4", fmt.Sprintf("%v:%v", net.IP(ownIp1[:]), port1)) - - if err != nil { - panic(err) - } - defer listener1.Close() - - l.Info("running 1 on %v", listener1.Addr()) - actualPort1 := uint16(listener1.Addr().(*net.TCPAddr).Port) - - var listener2 net.Listener - var actualPort2 uint16 - if len(addresses) == 2 { - listener2, err = lc.Listen(context.Background(), "tcp4", fmt.Sprintf("%v:%v", net.IP(ownIp2[:]), port2)) - if err != nil { - panic(err) } - defer listener2.Close() - - l.Info("running 2 on %v", listener2.Addr()) - actualPort2 = uint16(listener2.Addr().(*net.TCPAddr).Port) } - initBlockServicesInfo(env, l, msgs.Location(*locationId), msgs.AddrsInfo{Addr1: msgs.IpPort{Addrs: ownIp1, Port: actualPort1}, Addr2: msgs.IpPort{Addrs: ownIp2, Port: actualPort2}}, failureDomain, blockServices, *reservedStorage) - l.Info("finished updating block service info, will now start") - terminateChan := make(chan any) - for bsId := range blockServices { - env.stats[bsId] = &blockServiceStats{} - env.eraseLocks[bsId] = &sync.Mutex{} - } - for bsId := range deadBlockServices { - env.stats[bsId] = &blockServiceStats{} - env.eraseLocks[bsId] = &sync.Mutex{} - } env.counters = make(map[msgs.BlocksMessageKind]*timing.Timings) for _, k := range msgs.AllBlocksMessageKind { env.counters[k] = timing.NewTimings(40, 100*time.Microsecond, 1.5) @@ -1745,16 +1051,6 @@ func main() { registerPeriodically(l, blockServices, env) }() - go func() { - defer func() { lrecover.HandleRecoverChan(l, terminateChan, recover()) }() - updateBlockServiceInfoBlocksForever(l, blockServices) - }() - - go func() { - defer func() { lrecover.HandleRecoverChan(l, terminateChan, recover()) }() - updateBlockServiceInfoCapacityForever(l, blockServices, *reservedStorage) - }() - if influxDB != nil { go func() { defer func() { lrecover.HandleRecoverChan(l, terminateChan, recover()) }() @@ -1784,7 +1080,6 @@ func main() { } if *dscp != 0 { - // check if client wants to override DSCP ipv4Conn := ipv4.NewConn(conn) currentDSCP, err := ipv4Conn.TOS() if err != nil { @@ -1792,8 +1087,6 @@ func main() { return } if currentDSCP>>2 == 0 { - // Client did not set DSCP, so we set it - // Note that we shift left by 2 as the lower 2 bits are used for ECN err = ipv4Conn.SetTOS(int(*dscp) << 2) if err != nil { terminateChan <- err @@ -1804,7 +1097,7 @@ func main() { setupConn(conn) go func() { defer func() { lrecover.HandleRecoverChan(l, terminateChan, recover()) }() - handleRequest(l, env, terminateChan, blockServices, deadBlockServices, conn.(*net.TCPConn), *futureCutoff, *connectionTimeout) + handleRequest(l, env, terminateChan, blockServices, deadBlockServices, conn.(*net.TCPConn), *connectionTimeout) }() } }() @@ -1821,7 +1114,7 @@ func main() { setupConn(conn) go func() { defer func() { lrecover.HandleRecoverChan(l, terminateChan, recover()) }() - handleRequest(l, env, terminateChan, blockServices, deadBlockServices, conn.(*net.TCPConn), *futureCutoff, *connectionTimeout) + handleRequest(l, env, terminateChan, blockServices, deadBlockServices, conn.(*net.TCPConn), *connectionTimeout) }() } }() @@ -1834,127 +1127,3 @@ func main() { } } } - -func eraseFileIfExistsAndSyncDir(path string) error { - err := os.Remove(path) - if err != nil { - return err - } - dir, err := os.Open(filepath.Dir(path)) - if err != nil { - return err - } - defer dir.Close() - return dir.Sync() -} - -func writeBufToTemp(statBytes *uint64, basePath string, buf []byte) (string, error) { - if err := os.Mkdir(basePath, 0777); err != nil && !os.IsExist(err) { - return "", err - } - f, err := os.CreateTemp(basePath, "tmp.") - if err != nil { - return "", err - } - tmpName := f.Name() - defer func() { - if err != nil { - os.Remove(tmpName) - } - f.Close() - }() - bufSize := len(buf) - for len(buf) > 0 { - n, err := f.Write(buf) - if err != nil { - return "", err - } - buf = buf[n:] - } - if err = f.Sync(); err != nil { - return "", err - } - atomic.AddUint64(statBytes, uint64(bufSize)) - return tmpName, err -} - -func verifyCrcFile(log *log.Logger, readBuffer []byte, path string, expectedSize int64, expectedCrc msgs.Crc) error { - f, err := os.Open(path) - if err != nil { - log.Debug("failed opening file %s with error: %v", path, err) - return err - } - defer f.Close() - fs, err := f.Stat() - if err != nil { - log.Debug("failed stating file %s with error: %v", path, err) - return err - } - if fs.Size() != expectedSize { - log.Debug("failed file size when checking crc for file %s. expected size: %d size %d", path, expectedSize, fs.Size()) - return msgs.BAD_BLOCK_CRC - } - return verifyCrcReader(log, readBuffer, f, expectedCrc) -} - -func verifyCrcReader(log *log.Logger, readBuffer []byte, r io.Reader, expectedCrc msgs.Crc) error { - cursor := uint32(0) - remainingData := 0 - actualCrc := uint32(0) - processChunkSize := int(msgs.TERN_PAGE_WITH_CRC_SIZE) - if len(readBuffer) < processChunkSize { - readBuffer = make([]byte, processChunkSize) - } - readerHasMoreData := true - for readerHasMoreData { - read, err := r.Read(readBuffer[remainingData:]) - if err != nil { - if err == io.EOF { - err = nil - readerHasMoreData = false - } else { - log.Debug("failed reading source while checking crc. error: %v", err) - return err - } - } - remainingData += read - cursor += uint32(read) - if remainingData < int(msgs.TERN_PAGE_WITH_CRC_SIZE) { - continue - } - numAvailableChunks := remainingData / processChunkSize - for i := 0; i < numAvailableChunks; i++ { - actualPageCrc := crc32c.Sum(0, readBuffer[i*processChunkSize:i*processChunkSize+int(msgs.TERN_PAGE_SIZE)]) - storedPageCrc := binary.LittleEndian.Uint32(readBuffer[i*processChunkSize+int(msgs.TERN_PAGE_SIZE) : i*processChunkSize+int(msgs.TERN_PAGE_WITH_CRC_SIZE)]) - if storedPageCrc != actualPageCrc { - log.Debug("failed checking crc. incorrect page crc at offset %d, expected %v, got %v", cursor-uint32(remainingData)+uint32(i*processChunkSize), msgs.Crc(storedPageCrc), msgs.Crc(actualPageCrc)) - return msgs.BAD_BLOCK_CRC - } - actualCrc = crc32c.Append(actualCrc, actualPageCrc, int(msgs.TERN_PAGE_SIZE)) - } - copy(readBuffer[:], readBuffer[numAvailableChunks*processChunkSize:remainingData]) - remainingData -= numAvailableChunks * processChunkSize - } - if actualCrc != uint32(expectedCrc) { - log.Debug("failed checking crc. invalid block crc. expected %v, got %v", expectedCrc, msgs.Crc(actualCrc)) - return msgs.BAD_BLOCK_CRC - } - if remainingData > 0 { - log.Debug("failed checking crc. unexpected data left") - return msgs.BAD_BLOCK_CRC - } - return nil -} - -func moveFileAndSyncDir(src, dst string) error { - err := os.Rename(src, dst) - if err != nil { - return err - } - dir, err := os.Open(filepath.Dir(dst)) - if err != nil { - return err - } - defer dir.Close() - return dir.Sync() -} diff --git a/go/terntests/terntests.go b/go/terntests/terntests.go index e6577e5f..bfae2691 100644 --- a/go/terntests/terntests.go +++ b/go/terntests/terntests.go @@ -884,7 +884,7 @@ func (bsv *blockServiceVictim) start( FailureDomain: bsv.failureDomain, LogLevel: log.Level(), RegistryAddress: fmt.Sprintf("127.0.0.1:%d", registryPort), - FutureCutoff: &testBlockFutureCutoff, + EraseCutoff: &testBlockEraseCutoff, Addr1: fmt.Sprintf("127.0.0.1:%d", port1), Addr2: fmt.Sprintf("127.0.0.1:%d", port2), Profile: profile, @@ -1042,7 +1042,7 @@ func newTestClient(log *log.Logger, registryAddress string, counters *client.Cli // 0 interval won't do, because otherwise transient files will immediately be // expired and not picked. var testTransientDeadlineInterval = 30 * time.Second -var testBlockFutureCutoff = testTransientDeadlineInterval / 2 +var testBlockEraseCutoff = testTransientDeadlineInterval / 2 func main() { overrides := make(cfgOverrides)