Skip to content
3 changes: 2 additions & 1 deletion cmd/index_analyzer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func analyzeIndex(
if err := b.Unpack(readBlock()); err != nil {
logger.Fatal("error unpacking block info", zap.Error(err))
}
ver := b.Info.BinaryDataVer

docsCount := int(b.Info.DocsTotal)

Expand Down Expand Up @@ -162,7 +163,7 @@ func analyzeIndex(
}

block := &lids.Block{}
if err := block.Unpack(data, &lids.UnpackBuffer{}); err != nil {
if err := block.Unpack(data, ver, &lids.UnpackBuffer{}); err != nil {
logger.Fatal("error unpacking lids block", zap.Error(err))
}

Expand Down
4 changes: 3 additions & 1 deletion config/frac_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const (
BinaryDataV1
// BinaryDataV2 - MIDs stored in nanoseconds
BinaryDataV2
// BinaryDataV3 - delta bitpack encoded MIDs and LIDs
BinaryDataV3
)

const CurrentFracVersion = BinaryDataV2
const CurrentFracVersion = BinaryDataV3
4 changes: 2 additions & 2 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1841,8 +1841,8 @@ func (s *FractionTestSuite) TestFractionInfo() {
"index on disk doesn't match. actual value: %d", info.IndexOnDisk)
case *Remote:
s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value")
s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1500),
"index on disk doesn't match. actual value: %d", info.MetaOnDisk)
s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1550),
"index on disk doesn't match. actual value: %d", info.IndexOnDisk)
default:
s.Require().Fail("unsupported fraction type")
}
Expand Down
2 changes: 1 addition & 1 deletion frac/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, e
docsReader: &f.docsReader,
blocksOffsets: f.blocksData.BlocksOffsets,
lidsTable: f.blocksData.LIDsTable,
lidsLoader: lids.NewLoader(&f.indexReader, f.indexCache.LIDs),
lidsLoader: lids.NewLoader(f.info.BinaryDataVer, &f.indexReader, f.indexCache.LIDs),
tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, &f.indexReader, f.indexCache.Tokens),
tokenTableLoader: token.NewTableLoader(f.BaseFileName, &f.indexReader, f.indexCache.TokenTable),

Expand Down
2 changes: 1 addition & 1 deletion frac/sealed.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider {
docsReader: &f.docsReader,
blocksOffsets: f.blocksData.BlocksOffsets,
lidsTable: f.blocksData.LIDsTable,
lidsLoader: lids.NewLoader(&f.indexReader, f.indexCache.LIDs),
lidsLoader: lids.NewLoader(f.info.BinaryDataVer, &f.indexReader, f.indexCache.LIDs),
tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, &f.indexReader, f.indexCache.Tokens),
tokenTableLoader: token.NewTableLoader(f.BaseFileName, &f.indexReader, f.indexCache.TokenTable),

Expand Down
52 changes: 33 additions & 19 deletions frac/sealed/lids/block.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package lids

import (
"encoding/binary"
"math"
"unsafe"

"github.com/ozontech/seq-db/config"
"github.com/ozontech/seq-db/packer"
)

Expand All @@ -23,21 +23,9 @@ func (b *Block) getLIDs(i int) []uint32 {
return b.LIDs[b.Offsets[i]:b.Offsets[i+1]]
}

func (b *Block) Pack(dst []byte) []byte {
lastLID := int64(0)
last := b.getCount() - 1
for i := 0; i <= last; i++ {
for _, lid := range b.getLIDs(i) {
dst = binary.AppendVarint(dst, int64(lid)-lastLID)
lastLID = int64(lid)
}

if i < last || b.IsLastLID {
// when we add this value to prev we must get -1 (or math.MaxUint32 for uint32)
// it is the end-marker; see `Block.Unpack()`
dst = binary.AppendVarint(dst, -1-lastLID)
}
}
func (b *Block) Pack(dst []byte, buf []uint32) []byte {
dst = packer.CompressDeltaBitpackUint32(dst, b.Offsets, buf)
dst = packer.CompressDeltaBitpackUint32(dst, b.LIDs, buf)
return dst
}

Expand All @@ -49,13 +37,39 @@ func (b *Block) GetSizeBytes() int {
return blockSize + uint32Size*cap(b.LIDs) + uint32Size*cap(b.Offsets)
}

func (b *Block) Unpack(data []byte, buf *UnpackBuffer) error {
func (b *Block) Unpack(data []byte, fracVer config.BinaryDataVersion, buf *UnpackBuffer) error {
buf.Reset(fracVer)

if fracVer >= config.BinaryDataV3 {
return b.unpackBitpack(data, buf)
}

return b.unpackVarint(data, buf)
}

func (b *Block) unpackBitpack(data []byte, buf *UnpackBuffer) error {
var err error
var values []uint32

data, values, err = packer.DecompressDeltaBitpackUint32(data, buf.decompressed, buf.compressed)
if err != nil {
return err
}
b.Offsets = append([]uint32{}, values...)

_, values, err = packer.DecompressDeltaBitpackUint32(data, buf.decompressed, buf.compressed)
if err != nil {
return err
}
b.LIDs = append([]uint32{}, values...)
return nil
}

func (b *Block) unpackVarint(data []byte, buf *UnpackBuffer) error {
var lid, offset uint32

b.IsLastLID = true

buf.lids = buf.lids[:0]
buf.offsets = buf.offsets[:0]
buf.offsets = append(buf.offsets, 0) // first offset is always zero

unpacker := packer.NewBytesUnpacker(data)
Expand Down
226 changes: 226 additions & 0 deletions frac/sealed/lids/block_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
package lids

import (
"math"
"math/rand"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/ozontech/seq-db/config"
)

func TestBlockPack(t *testing.T) {
testCases := []struct {
name string
lids []uint32
offsets []uint32
generator func() ([]uint32, []uint32)
}{
{
name: "small_single_token",
lids: generate(4),
offsets: []uint32{0, 4},
},
{
name: "small_a_few_token",
lids: generate(6),
offsets: []uint32{0, 3, 6},
},
{
name: "small_single_lid",
lids: []uint32{100},
offsets: []uint32{0, 1},
},
{
name: "small_big_lids",
lids: []uint32{math.MaxUint32 - 100, math.MaxUint32 - 50, math.MaxUint32 - 10},
offsets: []uint32{0, 3},
},
{
name: "small_few_tokens",
lids: generate(8),
offsets: []uint32{0, 3, 6, 8},
},
{
name: "medium_many_tokens",
generator: func() ([]uint32, []uint32) {
lids := make([]uint32, 0)
offsets := []uint32{0}
startLID := uint32(100)
for i := 0; i < 10; i++ {
for j := 0; j < 3; j++ {
lids = append(lids, startLID+uint32(i*10+j))
}
offsets = append(offsets, uint32(len(lids)))
startLID += 30
}
return lids, offsets
},
},
{
name: "large_many_tokens",
generator: func() ([]uint32, []uint32) {
lids := make([]uint32, 0, 150)
offsets := []uint32{0}
groupSize := 30
for group := 0; group < 5; group++ {
for i := 0; i < groupSize; i++ {
lids = append(lids, 1+uint32(group*groupSize*10+i*10))
}
offsets = append(offsets, uint32(len(lids)))
}
return lids, offsets
},
},
{
name: "medium_128_lids",
lids: generate(128),
offsets: []uint32{0, 128},
},
{
name: "medium_127_lids",
lids: generate(127),
offsets: []uint32{0, 127},
},
{
name: "medium_129_lids",
lids: generate(129),
offsets: []uint32{0, 129},
},
{
name: "medium_4k_lids",
lids: generate(4096),
offsets: []uint32{0, 4096},
},
{
name: "medium_4k_minus_one_lids",
lids: generate(4095),
offsets: []uint32{0, 10, 50, 100, 150, 190, 1000, 1500, 4095},
},
{
name: "medium_4k_plus_one_lids",
lids: generate(4097),
offsets: []uint32{0, 10, 50, 100, 150, 190, 1000, 1500, 4097},
},
{
name: "medium_64k_lids",
lids: generate(65536),
offsets: []uint32{0, 65536},
},
{
name: "medium_64k_minus_one_lids",
lids: generate(65535),
offsets: []uint32{0, 10, 50, 100, 150, 190, 1000, 1500, 65535},
},
{
name: "medium_64k_plus_one_lids",
lids: generate(65537),
offsets: []uint32{0, 10, 50, 100, 150, 190, 1000, 1500, 65537},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var lids []uint32
var offsets []uint32

if tc.generator != nil {
lids, offsets = tc.generator()
} else {
lids = tc.lids
offsets = tc.offsets
}

block := &Block{
LIDs: lids,
Offsets: offsets,
}

packed := block.Pack(nil, nil)
require.NotEmpty(t, packed)

unpacked := &Block{}
buf := &UnpackBuffer{}
err := unpacked.Unpack(packed, config.CurrentFracVersion, buf)

require.NoError(t, err)
assert.EqualExportedValues(t, block, unpacked)
})
}
}

func generate(n int) []uint32 {
v := make([]uint32, n)
last := uint32(100)
for i := range v {
v[i] = last
last += uint32(1 + rand.Intn(5))
}
return v
}

func TestBlockPack_ReuseBuffer(t *testing.T) {
// Test that UnpackBuffer can be reused
block1 := &Block{
LIDs: generate(64 * 1024),
Offsets: []uint32{0, 3},
}

block2 := &Block{
LIDs: generate(64 * 1024),
Offsets: []uint32{0, 4},
}

buf1 := make([]uint32, 0, 64*1024)
packed1 := block1.Pack(nil, buf1)

buf1 = buf1[:0]
packed2 := block2.Pack(nil, buf1)

buf2 := &UnpackBuffer{}

unpacked1 := &Block{}
err := unpacked1.Unpack(packed1, config.CurrentFracVersion, buf2)
require.NoError(t, err)
assert.Equal(t, block1.LIDs, unpacked1.LIDs)

unpacked2 := &Block{}
err = unpacked2.Unpack(packed2, config.CurrentFracVersion, buf2)
require.NoError(t, err)
assert.Equal(t, block2.LIDs, unpacked2.LIDs)
}

func BenchmarkBlock_Pack(b *testing.B) {
lids := generate(64 * 1024)

block := &Block{
LIDs: lids,
Offsets: []uint32{0, 64 * 1024},
}
tmp := make([]uint32, 0, 64*1024/4)

for b.Loop() {
block.Pack(nil, tmp)
}
}

func BenchmarkBlock_Unpack(b *testing.B) {
lids := generate(64 * 1024)

block := &Block{
LIDs: lids,
Offsets: []uint32{0, 64 * 1024},
}
packed := block.Pack(nil, nil)

buf := &UnpackBuffer{}
unpacked := &Block{}

b.ResetTimer()
for b.Loop() {
err := unpacked.Unpack(packed, config.CurrentFracVersion, buf)
assert.NoError(b, err)
}
}
Loading
Loading