Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/core/CommonOptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ static inline bool parseLogOptions(CommandLineArgs& args, LogOptions& options) {
options.logLevel = LogLevel::LOG_DEBUG;
} else if (logLevel == "info") {
options.logLevel = LogLevel::LOG_INFO;
} else if (logLevel == "warn") {
options.logLevel = LogLevel::LOG_WARN;
} else if (logLevel == "error") {
options.logLevel = LogLevel::LOG_ERROR;
} else {
Expand Down
3 changes: 3 additions & 0 deletions cpp/core/Env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ std::ostream& operator<<(std::ostream& out, LogLevel ll) {
case LogLevel::LOG_INFO:
out << "INFO";
break;
case LogLevel::LOG_WARN:
out << "WARN";
break;
case LogLevel::LOG_ERROR:
out << "ERROR";
break;
Expand Down
12 changes: 11 additions & 1 deletion cpp/core/Env.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ enum class LogLevel : uint32_t {
LOG_TRACE = 0,
LOG_DEBUG = 1,
LOG_INFO = 2,
LOG_ERROR = 3,
LOG_WARN = 3,
LOG_ERROR = 4,
};

std::ostream& operator<<(std::ostream& out, LogLevel ll);
Expand Down Expand Up @@ -65,6 +66,8 @@ struct Logger {
syslogLevel = 7; break;
case LogLevel::LOG_INFO:
syslogLevel = 6; break;
case LogLevel::LOG_WARN:
syslogLevel = 4; break;
case LogLevel::LOG_ERROR:
syslogLevel = 3; break;
default:
Expand Down Expand Up @@ -181,6 +184,13 @@ struct Env {
} \
} while (false)

#define LOG_WARN(env, ...) \
do { \
if (likely((env).shouldLog(LogLevel::LOG_WARN))) { \
(env)._log(LogLevel::LOG_WARN, VALIDATE_FORMAT(__VA_ARGS__)); \
} \
} while (false)

#define LOG_ERROR(env, ...) \
do { \
if (likely((env).shouldLog(LogLevel::LOG_ERROR))) { \
Expand Down
51 changes: 27 additions & 24 deletions go/core/certificate/blockscert.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,41 @@
package certificate

import (
"bytes"
"crypto/cipher"
"encoding/binary"
"xtx/ternfs/core/cbcmac"
"xtx/ternfs/msgs"
)

func BlockWriteCertificate(cipher cipher.Block, blockServiceId msgs.BlockServiceId, req *msgs.WriteBlockReq) [8]byte {
w := bytes.NewBuffer([]byte{})
binary.Write(w, binary.LittleEndian, uint64(blockServiceId))
w.Write([]byte{'w'})
binary.Write(w, binary.LittleEndian, uint64(req.BlockId))
binary.Write(w, binary.LittleEndian, uint32(req.Crc))
binary.Write(w, binary.LittleEndian, uint32(req.Size))
return cbcmac.CBCMAC(cipher, w.Bytes())
func BlockWriteCertificate(cipher cipher.Block, blockServiceId msgs.BlockServiceId, blockId msgs.BlockId, crc msgs.Crc, size uint32) [8]byte {
var buf [25]byte
binary.LittleEndian.PutUint64(buf[0:8], uint64(blockServiceId))
buf[8] = 'w'
binary.LittleEndian.PutUint64(buf[9:17], uint64(blockId))
binary.LittleEndian.PutUint32(buf[17:21], uint32(crc))
binary.LittleEndian.PutUint32(buf[21:25], size)
return cbcmac.CBCMAC(cipher, buf[:])
}

func CheckBlockWriteCertificate(cipher cipher.Block, blockServiceId msgs.BlockServiceId, req *msgs.WriteBlockReq) ([8]byte, bool) {
expectedMac := BlockWriteCertificate(cipher, blockServiceId, req)
expectedMac := BlockWriteCertificate(cipher, blockServiceId, req.BlockId, req.Crc, req.Size)
return expectedMac, expectedMac == req.Certificate
}

func BlockEraseCertificate(blockServiceId msgs.BlockServiceId, blockId msgs.BlockId, key cipher.Block) [8]byte {
buf := bytes.NewBuffer([]byte{})
// struct.pack_into('<QcQ', b, 0, block['block_service_id'], b'e', block['block_id'])
binary.Write(buf, binary.LittleEndian, uint64(blockServiceId))
buf.Write([]byte{'e'})
binary.Write(buf, binary.LittleEndian, uint64(blockId))
func BlockWriteProof(cipher cipher.Block, blockServiceId msgs.BlockServiceId, blockId msgs.BlockId) [8]byte {
var buf [17]byte
binary.LittleEndian.PutUint64(buf[0:8], uint64(blockServiceId))
buf[8] = 'W'
binary.LittleEndian.PutUint64(buf[9:17], uint64(blockId))
return cbcmac.CBCMAC(cipher, buf[:])
}

return cbcmac.CBCMAC(key, buf.Bytes())
func BlockEraseCertificate(blockServiceId msgs.BlockServiceId, blockId msgs.BlockId, key cipher.Block) [8]byte {
var buf [17]byte
binary.LittleEndian.PutUint64(buf[0:8], uint64(blockServiceId))
buf[8] = 'e'
binary.LittleEndian.PutUint64(buf[9:17], uint64(blockId))
return cbcmac.CBCMAC(key, buf[:])
}

func CheckBlockEraseCertificate(blockServiceId msgs.BlockServiceId, cipher cipher.Block, req *msgs.EraseBlockReq) ([8]byte, bool) {
Expand All @@ -43,13 +48,11 @@ func CheckBlockEraseCertificate(blockServiceId msgs.BlockServiceId, cipher ciphe
}

func BlockEraseProof(blockServiceId msgs.BlockServiceId, blockId msgs.BlockId, key cipher.Block) [8]byte {
buf := bytes.NewBuffer([]byte{})
// struct.pack_into('<QcQ', b, 0, block['block_service_id'], b'E', block['block_id'])
binary.Write(buf, binary.LittleEndian, uint64(blockServiceId))
buf.Write([]byte{'E'})
binary.Write(buf, binary.LittleEndian, uint64(blockId))

return cbcmac.CBCMAC(key, buf.Bytes())
var buf [17]byte
binary.LittleEndian.PutUint64(buf[0:8], uint64(blockServiceId))
buf[8] = 'E'
binary.LittleEndian.PutUint64(buf[9:17], uint64(blockId))
return cbcmac.CBCMAC(key, buf[:])
}

func CheckBlockEraseProof(blockServiceId msgs.BlockServiceId, cipher cipher.Block, req *msgs.EraseBlockReq) ([8]byte, bool) {
Expand Down
12 changes: 11 additions & 1 deletion go/core/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ type LogLevel uint8
const TRACE LogLevel = 0
const DEBUG LogLevel = 1
const INFO LogLevel = 2
const ERROR LogLevel = 3
const WARN LogLevel = 3
const ERROR LogLevel = 4

func (ll LogLevel) String() string {
switch ll {
Expand All @@ -33,6 +34,8 @@ func (ll LogLevel) String() string {
return "DEBUG"
case INFO:
return "INFO"
case WARN:
return "WARN"
case ERROR:
return "ERROR"
default:
Expand Down Expand Up @@ -92,6 +95,9 @@ func (log *Logger) formatLog(level LogLevel, time time.Time, file string, line i
case ERROR:
levelColor = red
syslogPrio = syslogError
case WARN:
levelColor = yellow
syslogPrio = syslogWarn
case INFO:
levelColor = blue
syslogPrio = syslogInfo
Expand Down Expand Up @@ -244,6 +250,10 @@ func (l *Logger) InfoStack(calldepth int, format string, v ...any) {
l.LogStack(1+calldepth, INFO, format, v...)
}

func (l *Logger) Warn(format string, v ...any) {
l.LogStack(1, WARN, format, v...)
}

// There should be very few times where you want an error log but not an alert.
func (l *Logger) ErrorNoAlert(format string, v ...any) {
l.LogStack(1, ERROR, format, v...)
Expand Down
6 changes: 3 additions & 3 deletions go/core/managedprocess/managedprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ type BlockServiceOpts struct {
StorageClasses []msgs.StorageClass
FailureDomain string
Location msgs.Location
FutureCutoff *time.Duration
EraseCutoff *time.Duration
LogLevel log.LogLevel
RegistryAddress string
Profile bool
Expand All @@ -300,8 +300,8 @@ func (procs *ManagedProcesses) StartBlockService(ll *log.Logger, opts *BlockServ
if opts.Addr2 != "" {
args = append(args, "-addr", opts.Addr2)
}
if opts.FutureCutoff != nil {
args = append(args, "-future-cutoff", opts.FutureCutoff.String())
if opts.EraseCutoff != nil {
args = append(args, "-erase-cutoff", opts.EraseCutoff.String())
}
if opts.LogLevel == log.DEBUG {
args = append(args, "-verbose")
Expand Down
2 changes: 1 addition & 1 deletion go/msgs/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func (id BlockId) Path() string {
h := sha1.New()
h.Write([]byte(hex))
dir := fmt.Sprintf("%02x", h.Sum(nil)[0])
return path.Join("with_crc", dir, hex)
return path.Join(dir, hex)
}

func MakeInodeId(typ InodeType, shard ShardId, id uint64) InodeId {
Expand Down
103 changes: 103 additions & 0 deletions go/ternblocks/blockservice/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2025 XTX Markets Technologies Limited
//
// SPDX-License-Identifier: GPL-2.0-or-later

package blockservice

import (
"errors"
"fmt"
"io"
"os"
"strings"
"sync/atomic"
"syscall"
"xtx/ternfs/msgs"

"golang.org/x/sys/unix"
)

type blockReader struct {
*pageCrcReader
bs *BlockService
}

func newBlockReader(bs *BlockService, path string, offset uint32, count uint32, readAhead bool, stripCrc bool) (*blockReader, error) {
bs.logger.Debug("fetching block at path %v", path)
f, err := os.Open(path)
defer func() {
if f != nil {
f.Close()
}
}()

if os.IsNotExist(err) {
return nil, msgs.BLOCK_NOT_FOUND
}

if errors.Is(err, syscall.ENODATA) {
atomic.AddUint64(&bs.IoAttempts, 1)
atomic.AddUint64(&bs.IoErrors, 1)
bs.logger.RaiseHardwareEvent(strings.Split(bs.Path, ":")[0], bs.Id.String(),
fmt.Sprintf("could not open block %v, got ENODATA, this probably means that the block/disk is gone", path))
return nil, syscall.EIO
}

if err != nil {
return nil, err
}

if offset%msgs.TERN_PAGE_SIZE != 0 {
bs.logger.Warn("trying to read from offset other than page boundary for path %v", path)
return nil, msgs.BLOCK_FETCH_OUT_OF_BOUNDS
}
if count%msgs.TERN_PAGE_SIZE != 0 {
bs.logger.Warn("trying to read count not multiple of page size for path %v", path)
return nil, msgs.BLOCK_FETCH_OUT_OF_BOUNDS
}

fileOffset := int64(offset / msgs.TERN_PAGE_SIZE * msgs.TERN_PAGE_WITH_CRC_SIZE)
var pos int64
if pos, err = f.Seek(fileOffset, 0); err != nil {
return nil, err
}
if pos != fileOffset {
return nil, msgs.BLOCK_FETCH_OUT_OF_BOUNDS
}
adviseCount := int64(count / msgs.TERN_PAGE_SIZE * msgs.TERN_PAGE_WITH_CRC_SIZE)
if readAhead {
var stat unix.Stat_t
if err := unix.Fstat(int(f.Fd()), &stat); err == nil {
adviseCount = stat.Size - fileOffset
}
unix.Fadvise(int(f.Fd()), fileOffset, adviseCount, unix.FADV_SEQUENTIAL|unix.FADV_WILLNEED)
}

bs.wg.Add(1)
if bs.readC != nil {
bs.readC <- struct{}{}
}
reader := NewPageCrcReader(f, stripCrc)
f = nil
return &blockReader{
pageCrcReader: reader,
bs: bs,
}, nil
}

func (r *blockReader) Read(p []byte) (int, error) {
atomic.AddUint64(&r.bs.IoAttempts, 1)
read, err := r.pageCrcReader.Read(p)
if err != nil && err != io.EOF {
atomic.AddUint64(&r.bs.IoErrors, 1)
}
return read, err
}

func (r *blockReader) Close() error {
if r.bs.readC != nil {
<-r.bs.readC
}
r.bs.wg.Done()
return r.pageCrcReader.Close()
}
Loading
Loading