Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 45 additions & 45 deletions frac/sealed.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,51 @@ func NewSealed(
return f
}

func NewSealedPreloaded(
baseFile string,
preloaded *sealed.PreloadedData,
rl *storage.ReadLimiter,
indexCache *IndexCache,
docsCache *cache.Cache[[]byte],
config *Config,
) *Sealed {
f := &Sealed{
blocksData: preloaded.BlocksData,
docsCache: docsCache,
indexCache: indexCache,

loadMu: &sync.RWMutex{},
isLoaded: true,

readLimiter: rl,

info: preloaded.Info,
BaseFileName: baseFile,
Config: config,
}

// Put token table built during sealing into the cache.
indexCache.TokenTable.Get(token.CacheKeyTable, func() (token.Table, int) {
return preloaded.TokenTable, preloaded.TokenTable.Size()
})

f.openDocs()
f.openIndex()

docsCountK := float64(f.info.DocsTotal) / 1000
logger.Info("sealed fraction created from active",
zap.String("frac", f.info.Name()),
util.ZapMsTsAsESTimeStr("creation_time", f.info.CreationTime),
zap.String("from", f.info.From.String()),
zap.String("to", f.info.To.String()),
util.ZapFloat64WithPrec("docs_k", docsCountK, 1),
)

f.info.MetaOnDisk = 0

return f
}

func (f *Sealed) openInfo() {
if f.IsLegacy {
if f.legacyFile != nil {
Expand Down Expand Up @@ -235,51 +280,6 @@ func (f *Sealed) openDocs() {
f.docsReader = storage.NewDocsReader(f.readLimiter, f.docsFile, f.docsCache)
}

func NewSealedPreloaded(
baseFile string,
preloaded *sealed.PreloadedData,
rl *storage.ReadLimiter,
indexCache *IndexCache,
docsCache *cache.Cache[[]byte],
config *Config,
) *Sealed {
f := &Sealed{
blocksData: preloaded.BlocksData,
docsCache: docsCache,
indexCache: indexCache,

loadMu: &sync.RWMutex{},
isLoaded: true,

readLimiter: rl,

info: preloaded.Info,
BaseFileName: baseFile,
Config: config,
}

// Put token table built during sealing into the cache.
indexCache.TokenTable.Get(token.CacheKeyTable, func() (token.Table, int) {
return preloaded.TokenTable, preloaded.TokenTable.Size()
})

f.openDocs()
f.openIndex()

docsCountK := float64(f.info.DocsTotal) / 1000
logger.Info("sealed fraction created from active",
zap.String("frac", f.info.Name()),
util.ZapMsTsAsESTimeStr("creation_time", f.info.CreationTime),
zap.String("from", f.info.From.String()),
zap.String("to", f.info.To.String()),
util.ZapFloat64WithPrec("docs_k", docsCountK, 1),
)

f.info.MetaOnDisk = 0

return f
}

func (f *Sealed) load() {
f.loadMu.Lock()
defer f.loadMu.Unlock()
Expand Down
1 change: 0 additions & 1 deletion frac/sealed/seqids/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

type Table struct {
MinBlockIDs []seq.ID // from max to min
IDBlocksTotal uint32
IDsTotal uint32
StartBlockIndex uint32
}
Expand Down
20 changes: 9 additions & 11 deletions frac/sealed_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (l *LegacyLoader) Load(blocksData *sealed.BlocksData, info *common.Info, re
l.skipSection() // skip token table blocks

var err error
blocksData.IDsTable, blocksData.BlocksOffsets, err = l.loadIDs(info.BinaryDataVer)
blocksData.IDsTable, blocksData.BlocksOffsets, err = l.loadIDs(info)
if err != nil {
logger.Fatal("legacy load ids error", zap.Error(err))
}
Expand Down Expand Up @@ -77,7 +77,7 @@ func (l *LegacyLoader) skipSection() {
}

// loadIDs reads the BlockOffsets block and then scans MID/RID/Pos triplets.
func (l *LegacyLoader) loadIDs(fracVersion config.BinaryDataVersion) (seqids.Table, []uint64, error) {
func (l *LegacyLoader) loadIDs(info *common.Info) (seqids.Table, []uint64, error) {
var buf []byte

data, _, err := l.reader.ReadIndexBlock(l.blockIndex, buf)
Expand All @@ -94,9 +94,8 @@ func (l *LegacyLoader) loadIDs(fracVersion config.BinaryDataVersion) (seqids.Tab
l.blockIndex++

table := seqids.Table{
StartBlockIndex: l.blockIndex, // absolute index of first MID block in .index
IDsTotal: offsets.IDsTotal,
IDBlocksTotal: uint32(len(offsets.Offsets)),
StartBlockIndex: l.blockIndex, // absolute index of first MID block in .index
IDsTotal: info.DocsTotal + 1, // increment by one for [seq.SystemID]
}

for {
Expand All @@ -111,7 +110,7 @@ func (l *LegacyLoader) loadIDs(fracVersion config.BinaryDataVersion) (seqids.Tab
}

mid := seq.MID(h.GetExt1())
if fracVersion < config.BinaryDataV2 {
if info.BinaryDataVer < config.BinaryDataV2 {
mid = seq.MillisToMID(h.GetExt1())
}

Expand Down Expand Up @@ -187,7 +186,7 @@ func (l *Loader) Load(blocksData *sealed.BlocksData, info *common.Info, readers
}

blocksData.BlocksOffsets = blockOffsets.Offsets
blocksData.IDsTable = l.loadIDsTable(readers.ID, blockOffsets.IDsTotal, info.BinaryDataVer)
blocksData.IDsTable = l.loadIDsTable(readers.ID, info)

blocksData.LIDsTable, err = l.loadLIDsTable(readers.LID)
if err != nil {
Expand Down Expand Up @@ -228,10 +227,10 @@ func (l *Loader) loadBlocksOffsets(r storage.IndexReader) (sealed.BlockOffsets,

// loadIDsTable scans block headers in the .id file to build seqids.Table.
// Blocks are stored as (MIDs, RIDs, Pos) triplets; we only need MIDs headers.
func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersion config.BinaryDataVersion) seqids.Table {
func (l *Loader) loadIDsTable(r storage.IndexReader, info *common.Info) seqids.Table {
table := seqids.Table{
StartBlockIndex: 0,
IDsTotal: idsTotal,
IDsTotal: info.DocsTotal + 1, // increment by one for [seq.SystemID]
}

for blockIdx := uint32(0); ; {
Expand All @@ -244,7 +243,7 @@ func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersio
}

var mid seq.MID
if fracVersion < config.BinaryDataV2 {
if info.BinaryDataVer < config.BinaryDataV2 {
mid = seq.MillisToMID(header.GetExt1())
} else {
mid = seq.MID(header.GetExt1())
Expand All @@ -255,7 +254,6 @@ func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersio
RID: seq.RID(header.GetExt2()),
})

table.IDBlocksTotal++
blockIdx += 3 // skip RIDs and Pos blocks
}

Expand Down
11 changes: 2 additions & 9 deletions indexwriter/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,7 @@ func (s *IndexWriter) WriteOffsetsFile(ws io.WriteSeeker, src Source) error {
}
defer w.release()

offsets := sealed.BlockOffsets{
IDsTotal: src.Info().DocsTotal + 1,
Offsets: src.BlockOffsets(),
}

offsets := sealed.BlockOffsets{Offsets: src.BlockOffsets()}
if err := w.writeBlock(btypeOffset, s.packBlocksOffsetsBlock(offsets)); err != nil {
return err
}
Expand Down Expand Up @@ -250,6 +246,7 @@ func (s *IndexWriter) newIndexBlockZSTD(raw []byte, level int) indexBlock {

// packInfoBlock packs fraction information into an index block.
func (s *IndexWriter) packInfoBlock(block sealed.BlockInfo) indexBlock {
s.idsTable.IDsTotal = block.Info.DocsTotal + 1 // Increment by one for [seq.SystemID]
s.buf1 = block.Pack(s.buf1[:0])
return newIndexBlock(s.buf1) // Info block is typically small, no compression
}
Expand All @@ -274,10 +271,6 @@ func (s *IndexWriter) packTokenTableBlock(tokenTableBlock token.TableBlock) inde

// packBlocksOffsetsBlock packs document block offsets into a compressed index block.
func (s *IndexWriter) packBlocksOffsetsBlock(block sealed.BlockOffsets) indexBlock {
// Update IDs table for PreloadedData
s.idsTable.IDsTotal = block.IDsTotal // Total number of IDs
s.idsTable.IDBlocksTotal = uint32(len(block.Offsets)) // Number of ID blocks

// Packing block
s.buf1 = block.Pack(s.buf1[:0])
b := s.newIndexBlockZSTD(s.buf1, s.params.DocsPositionsZstdLevel)
Expand Down
9 changes: 3 additions & 6 deletions sealing/sealer.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) {

util.MustSyncPath(filepath.Dir(info.Path))

// Compute total index size as sum of all 5 files.
var totalSize uint64
info.IndexOnDisk = 0
for _, suffix := range []string{
consts.InfoFileSuffix,
consts.TokenFileSuffix,
Expand All @@ -123,18 +122,16 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) {
if err != nil {
return nil, err
}
totalSize += uint64(st.Size())
info.IndexOnDisk += uint64(st.Size())
}

info.IndexOnDisk = totalSize
lidsTable := sealer.LIDsTable()

preloaded := &sealed.PreloadedData{
Info: info,
TokenTable: sealer.TokenTable(),
BlocksData: sealed.BlocksData{
IDsTable: sealer.IDsTable(),
LIDsTable: &lidsTable,
IDsTable: sealer.IDsTable(),
BlocksOffsets: src.BlockOffsets(),
},
}
Expand Down