Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 103 additions & 48 deletions src/dbnode/persist/fs/commitlog/commit_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import (
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -149,17 +150,6 @@
// begin a new hot-swap.
*sync.WaitGroup
writer commitLogWriter
// Each writer maintains its own slice of pending flushFns because each writer will get
// flushed independently. This is important for maintaining correctness in code paths
// that care about durability, particularly during commitlog rotations.
//
// For example, imagine a call to WriteWait() occurs and the pending write is buffered
// in commitlog 1, but not yet flushed. Subsequently, a call to RotateLogs() occurs causing
// commitlog 1 to be (asynchronously) reset and commitlog 2 to become the new primary. Once
// the asynchronous Close and flush of commitlog 1 completes, only pending flushFns associated
// with commitlog 1 should be called as the writer associated with commitlog 2 may not have been
// flushed at all yet.
pendingFlushFns []callbackFn
}

func (w *asyncResettableWriter) onFlush(err error) {
Expand All @@ -177,10 +167,17 @@
queueCapacity tally.Gauge
success tally.Counter
errors tally.Counter
openSuccess tally.Counter
openErrors tally.Counter
openAsyncErrors tally.Counter
openAsyncSuccess tally.Counter
openLatency tally.Histogram
closeErrors tally.Counter
closeLatency tally.Histogram
resetLatency tally.Histogram
flushErrors tally.Counter
flushDone tally.Counter
flushTimerTask tally.Counter
}

type eventType int
Expand Down Expand Up @@ -238,6 +235,31 @@
return r.rotateLogs, nil
}

func zeroToTenSecondsHighResDurationBuckets() tally.Buckets {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a lot of buckets, any concerns for cardinality / instrumentation perf here?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm actually looks like the other buckets we have is ~60 so this is probably fine

var buckets tally.DurationBuckets
// From (zero, 1ms).
for i := 1; i < 10; i++ {
buckets = append(buckets, time.Duration(i)*100*time.Microsecond)
}
// From [1ms, 10ms).
for i := 1; i < 10; i++ {
buckets = append(buckets, time.Duration(i)*time.Millisecond)
}
// From [10ms, 100ms).
for i := 1; i < 10; i++ {
buckets = append(buckets, time.Duration(i)*10*time.Millisecond)
}
// From [100ms, 1s).
for i := 1; i < 10; i++ {
buckets = append(buckets, time.Duration(i)*100*time.Millisecond)
}
// From [1s, 10s].
for i := 1; i <= 10; i++ {
buckets = append(buckets, time.Duration(i)*time.Second)
}
return buckets
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a +Inf upper bound value here?

}

type commitLogWrite struct {
eventType eventType
write writeOrWriteBatch
Expand Down Expand Up @@ -275,10 +297,20 @@
queueCapacity: scope.Gauge("writes.queue-capacity"),
success: scope.Counter("writes.success"),
errors: scope.Counter("writes.errors"),
openSuccess: scope.Counter("writes.open-success"),
openErrors: scope.Counter("writes.open-errors"),
openLatency: scope.Histogram("writes.open-latency",
zeroToTenSecondsHighResDurationBuckets()),
openAsyncSuccess: scope.Counter("writes.open-async-success"),
openAsyncErrors: scope.Counter("writes.open-async-errors"),
closeErrors: scope.Counter("writes.close-errors"),
flushErrors: scope.Counter("writes.flush-errors"),
flushDone: scope.Counter("writes.flush-done"),
closeLatency: scope.Histogram("writes.close-latency",
zeroToTenSecondsHighResDurationBuckets()),
resetLatency: scope.Histogram("writes.reset-latency",
zeroToTenSecondsHighResDurationBuckets()),
flushErrors: scope.Counter("writes.flush-errors"),
flushDone: scope.Counter("writes.flush-done"),
flushTimerTask: scope.Counter("writes.flush-timer-task"),
},
}
// Setup backreferences for onFlush().
Expand Down Expand Up @@ -451,10 +483,31 @@
// any allocations.
var singleBatch = make([]ts.BatchWrite, 1)
var batch []ts.BatchWrite
finalizeCh := make(chan ts.WriteBatch, 1024)
defer close(finalizeCh)

go func() {
// Finalize batch writes without allocating.
for batch := range finalizeCh {
batch.Finalize()
}
}()

// Lock this current goroutine to the OS thread so it isn't pre-empted
// and interrupted during the critical write/encode loop.
runtime.LockOSThread()
defer runtime.UnlockOSThread()

for write := range l.writes {
if write.eventType == flushEventType {
l.writerState.primary.writer.Flush(false)
l.metrics.flushTimerTask.Inc(1)

if err := l.writerState.primary.writer.Flush(false); err != nil {
l.log.Error("failed to flush commit log", zap.Error(err))
if l.commitLogFailFn != nil {
l.commitLogFailFn(err)

Check warning on line 508 in src/dbnode/persist/fs/commitlog/commit_log.go

View check run for this annotation

Codecov / codecov/patch

src/dbnode/persist/fs/commitlog/commit_log.go#L506-L508

Added lines #L506 - L508 were not covered by tests
}
}
continue
}

Expand All @@ -469,12 +522,6 @@
continue
}

// For writes requiring acks add to pending acks
if write.eventType == writeEventType && write.callbackFn != nil {
l.writerState.primary.pendingFlushFns = append(
l.writerState.primary.pendingFlushFns, write.callbackFn)
}

isRotateLogsEvent := write.eventType == rotateLogsEventType
if isRotateLogsEvent {
primaryFile, _, err := l.openWriters()
Expand All @@ -486,6 +533,8 @@
if l.commitLogFailFn != nil {
l.commitLogFailFn(err)
}
} else {
l.metrics.openSuccess.Inc(1)
}

write.callbackFn(callbackResult{
Expand All @@ -512,7 +561,13 @@
}
numDequeued = len(batch)

for _, writeBatch := range batch {
lastWritableElem := -1
for i := numDequeued - 1; i >= 0 && lastWritableElem == -1; i-- {
if batch[i].Err == nil && !batch[i].SkipWrite {
lastWritableElem = i
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can this break instead of having lastWritableElem == -1 in the guard? makes intentions clearer, and one fewer check per iteration is a bonus

}
}
for i, writeBatch := range batch {
if writeBatch.Err != nil {
// This entry was not written successfully to the in-memory datastructures so
// we should not persist it to the commitlog. This is important to maintain
Expand All @@ -529,19 +584,33 @@
continue
}

// Only call the callback if last in the batch to avoid
// double calling callback.
var callback callbackFn
if i == lastWritableElem {
callback = write.callbackFn
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be useful adding a metric here to see how often this hits?

}

write := writeBatch.Write
err := l.writerState.primary.writer.Write(write.Series,
write.Datapoint, write.Unit, write.Annotation)
write.Datapoint, write.Unit, write.Annotation, callback)
if err != nil {
l.handleWriteErr(err)
continue
}
numWritesSuccess++
}
if lastWritableElem == -1 && write.callbackFn != nil {
// Call callback successfully if no elements actually needed to write.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be useful adding a metric here to see how often this hits?

write.callbackFn(callbackResult{
eventType: flushEventType,
})

Check warning on line 607 in src/dbnode/persist/fs/commitlog/commit_log.go

View check run for this annotation

Codecov / codecov/patch

src/dbnode/persist/fs/commitlog/commit_log.go#L605-L607

Added lines #L605 - L607 were not covered by tests
}

// Return the write batch to the pool.
if write.write.writeBatch != nil {
write.write.writeBatch.Finalize()
// NB(r): Do not block the write loop finalizing resources.
finalizeCh <- write.write.writeBatch
}

atomic.AddInt64(&l.numWritesInQueue, int64(-numDequeued))
Expand Down Expand Up @@ -582,29 +651,6 @@
}
}

// onFlush will never be called concurrently. The flushFn for the primaryWriter
// will only ever be called synchronously by the single-threaded writer goroutine
// and the flushFn for the secondaryWriter will only be called by the asynchronous
// goroutine (created by the single-threaded writer) when it calls Close() on the
// secondary (previously primary due to a hot-swap) writer during the reset.
//
// Note that both the primary and secondar's flushFn may be called during calls to
// Open() on the commitlog, but this takes place before the single-threaded writer
// is spawned which precludes it from occurring concurrently with either of the
// scenarios described above.
if len(writer.pendingFlushFns) == 0 {
l.metrics.flushDone.Inc(1)
return
}

for i := range writer.pendingFlushFns {
writer.pendingFlushFns[i](callbackResult{
eventType: flushEventType,
err: err,
})
writer.pendingFlushFns[i] = nil
}
writer.pendingFlushFns = writer.pendingFlushFns[:0]
l.metrics.flushDone.Inc(1)
}

Expand Down Expand Up @@ -689,18 +735,27 @@
l.writerState.secondary.writer = nil

l.metrics.errors.Inc(1)
l.metrics.openErrors.Inc(1)
l.metrics.openAsyncErrors.Inc(1)
} else {
l.metrics.openAsyncSuccess.Inc(1)
}

l.writerState.secondary.Done()
}()

if err = l.writerState.secondary.writer.Close(); err != nil {
closeStart := l.nowFn()
err = l.writerState.secondary.writer.Close()
l.metrics.closeLatency.RecordDuration(l.nowFn().Sub(closeStart))
if err != nil {
l.commitLogFailFn(err)
return
}

openStart := l.nowFn()
_, err = l.writerState.secondary.writer.Open()
openEnd := l.nowFn()
l.metrics.openLatency.RecordDuration(openEnd.Sub(openStart))
l.metrics.resetLatency.RecordDuration(openEnd.Sub(closeStart))
if err != nil {
l.commitLogFailFn(err)
return
Expand Down
30 changes: 29 additions & 1 deletion src/dbnode/persist/fs/commitlog/commit_log_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 10 additions & 9 deletions src/dbnode/persist/fs/commitlog/commit_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func snapshotCounterValue(

type mockCommitLogWriter struct {
openFn func() (persist.CommitLogFile, error)
writeFn func(ts.Series, ts.Datapoint, xtime.Unit, ts.Annotation) error
writeFn func(ts.Series, ts.Datapoint, xtime.Unit, ts.Annotation, callbackFn) error
flushFn func(sync bool) error
closeFn func() error
}
Expand All @@ -189,7 +189,7 @@ func newMockCommitLogWriter() *mockCommitLogWriter {
openFn: func() (persist.CommitLogFile, error) {
return persist.CommitLogFile{}, nil
},
writeFn: func(ts.Series, ts.Datapoint, xtime.Unit, ts.Annotation) error {
writeFn: func(ts.Series, ts.Datapoint, xtime.Unit, ts.Annotation, callbackFn) error {
return nil
},
flushFn: func(sync bool) error {
Expand All @@ -210,8 +210,9 @@ func (w *mockCommitLogWriter) Write(
datapoint ts.Datapoint,
unit xtime.Unit,
annotation ts.Annotation,
callback callbackFn,
) error {
return w.writeFn(series, datapoint, unit, annotation)
return w.writeFn(series, datapoint, unit, annotation, callback)
}

func (w *mockCommitLogWriter) Flush(sync bool) error {
Expand Down Expand Up @@ -794,7 +795,7 @@ func TestCommitLogFailOnWriteError(t *testing.T) {
commitLog := commitLogI.(*commitLog)
writer := newMockCommitLogWriter()

writer.writeFn = func(ts.Series, ts.Datapoint, xtime.Unit, ts.Annotation) error {
writer.writeFn = func(ts.Series, ts.Datapoint, xtime.Unit, ts.Annotation, callbackFn) error {
Copy link
Copy Markdown
Collaborator

@arnikola arnikola Jun 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wow you can do this? I thought you needed to explicitly set the args as _ callbackFn etc

return fmt.Errorf("an error")
}

Expand Down Expand Up @@ -886,9 +887,9 @@ func TestCommitLogFailOnOpenError(t *testing.T) {
require.True(t, ok)
require.Equal(t, int64(1), errors.Value())

openErrors, ok := snapshotCounterValue(scope, "commitlog.writes.open-errors")
openAsyncErrors, ok := snapshotCounterValue(scope, "commitlog.writes.open-async-errors")
require.True(t, ok)
require.Equal(t, int64(1), openErrors.Value())
require.Equal(t, int64(1), openAsyncErrors.Value())
}

func TestCommitLogFailOnFlushError(t *testing.T) {
Expand Down Expand Up @@ -1039,9 +1040,9 @@ func TestCommitLogBatchWriteDoesNotAddErroredOrSkippedSeries(t *testing.T) {

defer cleanup(t, opts)
commitLog := newTestCommitLog(t, opts)
finalized := 0
finalized := uint64(0)
finalizeFn := func(_ ts.WriteBatch) {
finalized++
atomic.AddUint64(&finalized, 1)
}

writes := ts.NewWriteBatch(4, ident.StringID("ns"), finalizeFn)
Expand Down Expand Up @@ -1100,5 +1101,5 @@ func TestCommitLogBatchWriteDoesNotAddErroredOrSkippedSeries(t *testing.T) {
}

assertCommitLogWritesByIterating(t, commitLog, expected)
require.Equal(t, 1, finalized)
require.Equal(t, 1, int(atomic.LoadUint64(&finalized)))
}
Loading