Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 100 additions & 30 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ func (e *Entry) IsExpired() bool {
return time.Since(e.Timestamp) > e.TTL
}

// Cache is a thread-safe in-memory cache with TTL.
// Cache is a thread-safe in-memory cache with TTL and per-register storage.
type Cache struct {
mu sync.RWMutex
entries map[string]*Entry
defaultTTL time.Duration
keepStale bool // when true, cleanup won't delete expired entries

// For request coalescing
inflight map[string]*inflightRequest
Expand All @@ -41,10 +42,12 @@ type inflightRequest struct {
}

// New creates a new cache with the specified default TTL.
func New(defaultTTL time.Duration) *Cache {
// If keepStale is true, expired entries are retained for stale serving.
func New(defaultTTL time.Duration, keepStale bool) *Cache {
c := &Cache{
entries: make(map[string]*Entry),
defaultTTL: defaultTTL,
keepStale: keepStale,
inflight: make(map[string]*inflightRequest),
done: make(chan struct{}),
}
Expand All @@ -60,9 +63,14 @@ func (c *Cache) Close() {
close(c.done)
}

// Key generates a cache key from request parameters.
func Key(slaveID byte, functionCode byte, address uint16, quantity uint16) string {
return fmt.Sprintf("%d:%d:%d:%d", slaveID, functionCode, address, quantity)
// RegKey generates a cache key for a single register or coil.
func RegKey(slaveID byte, functionCode byte, address uint16) string {
return fmt.Sprintf("%d:%d:%d", slaveID, functionCode, address)
}

// RangeKey generates a coalescing key for a request range.
func RangeKey(slaveID byte, functionCode byte, startAddr uint16, quantity uint16) string {
return fmt.Sprintf("%d:%d:%d:%d", slaveID, functionCode, startAddr, quantity)
}

// Get retrieves a value from the cache.
Expand Down Expand Up @@ -100,11 +108,6 @@ func (c *Cache) GetStale(key string) ([]byte, bool) {

// Set stores a value in the cache with the default TTL.
func (c *Cache) Set(key string, data []byte) {
c.SetWithTTL(key, data, c.defaultTTL)
}

// SetWithTTL stores a value in the cache with a specific TTL.
func (c *Cache) SetWithTTL(key string, data []byte, ttl time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -115,7 +118,7 @@ func (c *Cache) SetWithTTL(key string, data []byte, ttl time.Duration) {
c.entries[key] = &Entry{
Data: dataCopy,
Timestamp: time.Now(),
TTL: ttl,
TTL: c.defaultTTL,
}
}

Expand All @@ -126,32 +129,95 @@ func (c *Cache) Delete(key string) {
delete(c.entries, key)
}

// GetOrFetch retrieves a value from the cache or fetches it using the provided function.
// This implements request coalescing - multiple concurrent requests for the same key
// will share a single fetch operation.
// Returns the data, a boolean indicating if it was a cache hit, and any error.
func (c *Cache) GetOrFetch(ctx context.Context, key string, fetch func(context.Context) ([]byte, error)) ([]byte, bool, error) {
// Check cache first
if data, ok := c.Get(key); ok {
return data, true, nil
// GetRange retrieves all values for a contiguous register range.
// Returns the per-register/coil values and true only if ALL are cached and fresh.
func (c *Cache) GetRange(slaveID byte, functionCode byte, startAddr uint16, quantity uint16) ([][]byte, bool) {
c.mu.RLock()
defer c.mu.RUnlock()

values := make([][]byte, quantity)
for i := uint16(0); i < quantity; i++ {
key := RegKey(slaveID, functionCode, startAddr+i)
entry, ok := c.entries[key]
if !ok || entry.IsExpired() {
Comment thread
tma marked this conversation as resolved.
return nil, false
}
data := make([]byte, len(entry.Data))
copy(data, entry.Data)
values[i] = data
}
return values, true
}

// GetRangeStale retrieves all values for a contiguous register range, ignoring TTL.
// Returns the per-register/coil values and true only if ALL are present (even if expired).
func (c *Cache) GetRangeStale(slaveID byte, functionCode byte, startAddr uint16, quantity uint16) ([][]byte, bool) {
c.mu.RLock()
defer c.mu.RUnlock()

values := make([][]byte, quantity)
for i := uint16(0); i < quantity; i++ {
key := RegKey(slaveID, functionCode, startAddr+i)
entry, ok := c.entries[key]
if !ok {
Comment thread
tma marked this conversation as resolved.
return nil, false
}
data := make([]byte, len(entry.Data))
copy(data, entry.Data)
values[i] = data
}
return values, true
}

// SetRange stores individual values for a contiguous register range.
// All entries are stored with the same timestamp for consistency.
func (c *Cache) SetRange(slaveID byte, functionCode byte, startAddr uint16, values [][]byte) {
c.mu.Lock()
defer c.mu.Unlock()

now := time.Now()
for i, v := range values {
key := RegKey(slaveID, functionCode, startAddr+uint16(i))
dataCopy := make([]byte, len(v))
copy(dataCopy, v)
c.entries[key] = &Entry{
Data: dataCopy,
Timestamp: now,
TTL: c.defaultTTL,
}
}
}

// DeleteRange removes all entries for a contiguous register range.
func (c *Cache) DeleteRange(slaveID byte, functionCode byte, startAddr uint16, quantity uint16) {
c.mu.Lock()
defer c.mu.Unlock()

// Check if there's already an in-flight request
for i := uint16(0); i < quantity; i++ {
key := RegKey(slaveID, functionCode, startAddr+i)
delete(c.entries, key)
}
}

// Coalesce ensures only one fetch runs for a given key at a time.
// Other callers with the same key wait for and share the first caller's result.
// This handles request coalescing only — it does not interact with cache storage.
func (c *Cache) Coalesce(ctx context.Context, key string, fetch func(context.Context) ([]byte, error)) ([]byte, error) {
c.inflightMu.Lock()
if req, ok := c.inflight[key]; ok {
c.inflightMu.Unlock()
// Wait for the in-flight request to complete
select {
case <-req.done:
if req.err != nil {
return nil, false, req.err
return nil, req.err
}
// Return a copy
data := make([]byte, len(req.result))
copy(data, req.result)
return data, false, nil
return data, nil
case <-ctx.Done():
return nil, false, ctx.Err()
return nil, ctx.Err()
}
}

Expand All @@ -165,22 +231,23 @@ func (c *Cache) GetOrFetch(ctx context.Context, key string, fetch func(context.C
// Fetch the data
data, err := fetch(ctx)

// Store result
// Store result for waiters
req.result = data
req.err = err

// Cache successful results
if err == nil {
c.Set(key, data)
}

// Clean up and notify waiters
c.inflightMu.Lock()
delete(c.inflight, key)
c.inflightMu.Unlock()
close(req.done)

return data, false, err
if err != nil {
return nil, err
}

result := make([]byte, len(data))
copy(result, data)
return result, nil
}

// cleanup periodically removes expired entries.
Expand All @@ -193,6 +260,9 @@ func (c *Cache) cleanup() {
case <-c.done:
return
case <-ticker.C:
if c.keepStale {
continue
}
Comment thread
tma marked this conversation as resolved.
Outdated
c.mu.Lock()
for key, entry := range c.entries {
if entry.IsExpired() {
Expand Down
Loading
Loading