diff --git a/.github/workflows/natsstore.yaml b/.github/workflows/natsstore.yaml new file mode 100644 index 0000000..06d4a3f --- /dev/null +++ b/.github/workflows/natsstore.yaml @@ -0,0 +1,58 @@ +name: natsstore +on: + push: + paths: + - 'natsstore/**' + + pull_request: + paths: + - 'natsstore/**' + + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version: 'stable' + + - name: Install deps + working-directory: natsstore + shell: bash --noprofile --norc -x -eo pipefail {0} + run: | + go get -t ./... + go install honnef.co/go/tools/cmd/staticcheck@latest + go install github.com/client9/misspell/cmd/misspell@latest + + - name: Run linters + working-directory: natsstore + shell: bash --noprofile --norc -x -eo pipefail {0} + run: | + $(exit $(go fmt ./... | wc -l)) + go vet ./... + go vet ./test/... + staticcheck ./... + staticcheck ./test/... + find . -type f -name "*.go" | xargs misspell -error -locale US + + test: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version: 'stable' + + - name: Run tests + working-directory: natsstore + shell: bash --noprofile --norc -x -eo pipefail {0} + run: | + go test -v -count=1 ./test/... diff --git a/README.md b/README.md index ff03a97..0f408a0 100644 --- a/README.md +++ b/README.md @@ -12,15 +12,18 @@ You can use the library as a whole, or pick just what you need. # Utilities -| Module | Description | Docs | Version | -|----------------------|----------------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------| -| Core NATS Extensions | Core NATS extensions | [README.md](natsext/README.md) | [![Go Reference][natsext-image]][natsext-url] | -| `natscontext` | Allow connecting to NATS using NATS Contexts | [README.md](natscontext/README.md) | [![Go Reference][natscontext-image]][natscontext-url] | -| NATS System Client | NATS client for NATS monitoring APIs | [README.md](natssysclient/README.md) | [![Go Reference][natssysclient-image]][natssysclient-url] | +| Module | Description | Docs | Version | +|-----------------------|-------------------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------| +| Core NATS Extensions | Core NATS extensions | [README.md](natsext/README.md) | [![Go Reference][natsext-image]][natsext-url] | +| `natscontext` | Allow connecting to NATS using NATS Contexts | [README.md](natscontext/README.md) | [![Go Reference][natscontext-image]][natscontext-url] | +| `natsstore` | A usability wrapper around JetStream KV buckets | [README.md](natsstore/README.md) | [![Go Reference][natsnatsstore-image]][natsnatsstore-url] | +| NATS System Client | NATS client for NATS monitoring APIs | [README.md](natssysclient/README.md) | [![Go Reference][natssysclient-image]][natssysclient-url] | [natsext-url]: https://pkg.go.dev/github.com/synadia-io/orbit.go/natsext [natsext-image]: https://pkg.go.dev/badge/github.com/synadia-io/orbit.go/natsext.svg [natscontext-url]: https://pkg.go.dev/github.com/synadia-io/orbit.go/natscontext [natscontext-image]: https://pkg.go.dev/badge/github.com/synadia-io/orbit.go/natscontext.svg +[natsstore-url]: https://pkg.go.dev/github.com/synadia-io/orbit.go/natsstore +[natsstore-image]: https://pkg.go.dev/badge/github.com/synadia-io/orbit.go/natsstore.svg [natssysclient-url]: https://pkg.go.dev/github.com/synadia-io/orbit.go/natssysclient [natssysclient-image]: https://pkg.go.dev/badge/github.com/synadia-io/orbit.go/natssysclient.svg \ No newline at end of file diff --git a/natscontext/go.work.sum b/natscontext/go.work.sum index 95f6e41..9ac00f0 100644 --- a/natscontext/go.work.sum +++ b/natscontext/go.work.sum @@ -13,9 +13,11 @@ golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.26.0 h1:WEQa6V3Gja/BhNxg540hBip/kkaYtRg3cxg4oXSw4AU= golang.org/x/term v0.26.0/go.mod h1:Si5m1o57C5nBNQo5z1iq+XDijt21BDBDp2bK0QI8e3E= +golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q= golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= diff --git a/natsext/go.work.sum b/natsext/go.work.sum index 07e61c1..d2410aa 100644 --- a/natsext/go.work.sum +++ b/natsext/go.work.sum @@ -1,16 +1,10 @@ github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= -github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA= -github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw= -github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= -github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= -golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= -golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= @@ -18,15 +12,15 @@ golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24= golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= +golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q= golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= diff --git a/natsstore/README.md b/natsstore/README.md new file mode 100644 index 0000000..16c2792 --- /dev/null +++ b/natsstore/README.md @@ -0,0 +1,90 @@ +# NATS JetStream Store + +[License-Url]: https://www.apache.org/licenses/LICENSE-2.0 +[License-Image]: https://img.shields.io/badge/License-Apache2-blue.svg +[ReportCard-Url]: https://goreportcard.com/report/github.com/synadia-io/orbit.go/natsstore +[ReportCard-Image]: https://goreportcard.com/badge/github.com/synadia-io/orbit.go/natsstore +[Build-Status-Url]: https://github.com/synadia-io/orbit.go/actions/workflows/natsstore.yaml +[Build-Status-Image]: https://github.com/synadia-io/orbit.go/actions/workflows/natsstore.yaml/badge.svg?branch=main +[GoDoc-Url]: https://pkg.go.dev/github.com/synadia-io/orbit.go/natsstore +[GoDoc-Image]: https://pkg.go.dev/badge/github.com/synadia-io/orbit.go/natsstore.svg + +[![License][License-Image]][License-Url] +[![Go Reference][GoDoc-Image]][GoDoc-Url] +[![Build Status][Build-Status-Image]][Build-Status-Url] +[![Go Report Card][ReportCard-Image]][ReportCard-Url] + +Nats JetStream Store library provides a client to make it easier to perform common operations on a JetStream KV bucket. + +## Installation + +```bash +go get github.com/synadia-io/orbit.go/natsstore +``` + +## Usage + +The client serves as a wrapper around a KV bucket. However, it also requires access to the underlying +JetStream stream to support low-level purging. Therefor, the client will look for the bucket and underlying stream. + +```go +// connect to nats +nc, err := nats.Connect(nats.DefaultURL) +if err != nil { + // handle error +} +defer nc.Close() + +// get the jetstream client +js, err := jetstream.New(nc) +if err != nil { + // handle error +} + +// get a store wrapping an existing JetStream KV bucket, using a json +// codec to persist entries +store, err = natsstore.NewKeyValueStore(js, "myBucket", natsstore.NewJsonCodec()) +if err != nil { + // handle error +} + +// add a new entry to the store but fail if it already exists +_, err := store.Apply(context.Background(), "entries.entry_one", func(entry natsstore.Entry) ([]byte, error) { + if !entry.IsNew() { + return nil, fmt.Errorf("entry already exists") + } + + return entry.Encode("Hello World") +}) +if err != nil { + // handle error +} + +// update an entry in the store, but fail if it does not exist +_, err := store.Apply(context.Background(), "entries.entry_one", func(entry natsstore.Entry) ([]byte, error) { + if entry.IsNew() { + return nil, fmt.Errorf("entry does not exist") + } + + return entry.Encode("Goodbye, cruel world") +}) +if err != nil { + // handle error +} + +// list the entries in the store +fmt.Println("Entries:") +err := store.List(context.Background(), "entries.>", func(entry natsstore.Entry, hasMore bool) error { + fmt.Printf(" - %s\n", entry.Key()) + return nil +}) +if err != nil { + // handle error +} +``` + +### Codecs +The store entries are stored as byte slices, but the store can be configured with a codec to encode and decode the +entries. The library provides a `Json` codec which can optionally be wrapped by a `Secured` codec. The latter will +encrypt/decrypt all entry data written/read to/from the store. You can also implement your own codec by implementing +the `Codec` interface and passing it when constructing the store. diff --git a/natsstore/codec.go b/natsstore/codec.go new file mode 100644 index 0000000..d997081 --- /dev/null +++ b/natsstore/codec.go @@ -0,0 +1,7 @@ +package natsstore + +// Codec provides a way to encode and decode data +type Codec interface { + Encode(any) ([]byte, error) + Decode([]byte, any) error +} diff --git a/natsstore/codec_json.go b/natsstore/codec_json.go new file mode 100644 index 0000000..20b848c --- /dev/null +++ b/natsstore/codec_json.go @@ -0,0 +1,17 @@ +package natsstore + +import "encoding/json" + +func NewJsonCodec() Codec { + return &jsonCodec{} +} + +type jsonCodec struct{} + +func (c *jsonCodec) Encode(a any) ([]byte, error) { + return json.Marshal(a) +} + +func (c *jsonCodec) Decode(data []byte, a any) error { + return json.Unmarshal(data, a) +} diff --git a/natsstore/codec_secure.go b/natsstore/codec_secure.go new file mode 100644 index 0000000..cd7fc01 --- /dev/null +++ b/natsstore/codec_secure.go @@ -0,0 +1,36 @@ +package natsstore + +import "github.com/nats-io/nkeys" + +func Secured(codec Codec, xkey nkeys.KeyPair) Codec { + pk, _ := xkey.PublicKey() + return &secureCodec{ + codec: codec, + xkey: xkey, + pk: pk, + } +} + +type secureCodec struct { + codec Codec + xkey nkeys.KeyPair + pk string +} + +func (s *secureCodec) Encode(a any) ([]byte, error) { + b, err := s.codec.Encode(a) + if err != nil { + return nil, err + } + + return s.xkey.Seal(b, s.pk) +} + +func (s *secureCodec) Decode(bytes []byte, a any) error { + b, err := s.xkey.Open(bytes, s.pk) + if err != nil { + return err + } + + return s.codec.Decode(b, a) +} diff --git a/natsstore/entry.go b/natsstore/entry.go new file mode 100644 index 0000000..5449807 --- /dev/null +++ b/natsstore/entry.go @@ -0,0 +1,30 @@ +package natsstore + +func newEntry(codec Codec) Entry { + return Entry{ + Revision: 0, + codec: codec, + } +} + +type Entry struct { + Key string + Data []byte + Revision uint64 + codec Codec +} + +// IsNew returns true if the entry isn't stored +func (e Entry) IsNew() bool { + return e.Revision == 0 && len(e.Data) == 0 +} + +// Decode uses the codec to decode the data in the entry yet +func (e Entry) Decode(a any) error { + return e.codec.Decode(e.Data, a) +} + +// Encode uses the codec to encode the data in the entry +func (e Entry) Encode(a any) ([]byte, error) { + return e.codec.Encode(a) +} diff --git a/natsstore/go.mod b/natsstore/go.mod new file mode 100644 index 0000000..f82d7b8 --- /dev/null +++ b/natsstore/go.mod @@ -0,0 +1,14 @@ +module github.com/synadia-io/orbit.go/natsstore + +go 1.24.1 + +require github.com/nats-io/nats.go v1.40.1 + +require ( + github.com/klauspost/compress v1.18.0 // indirect + github.com/nats-io/nkeys v0.4.10 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + golang.org/x/crypto v0.36.0 // indirect + golang.org/x/sys v0.31.0 // indirect + golang.org/x/text v0.23.0 // indirect +) diff --git a/natsstore/go.sum b/natsstore/go.sum new file mode 100644 index 0000000..6870ca5 --- /dev/null +++ b/natsstore/go.sum @@ -0,0 +1,14 @@ +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/nats-io/nats.go v1.40.1 h1:MLjDkdsbGUeCMKFyCFoLnNn/HDTqcgVa3EQm+pMNDPk= +github.com/nats-io/nats.go v1.40.1/go.mod h1:wV73x0FSI/orHPSYoyMeJB+KajMDoWyXmFaRrrYaaTo= +github.com/nats-io/nkeys v0.4.10 h1:glmRrpCmYLHByYcePvnTBEAwawwapjCPMjy2huw20wc= +github.com/nats-io/nkeys v0.4.10/go.mod h1:OjRrnIKnWBFl+s4YK5ChQfvHP2fxqZexrKJoVVyWB3U= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= +golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= +golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= diff --git a/natsstore/go.work b/natsstore/go.work new file mode 100644 index 0000000..5c3fdc7 --- /dev/null +++ b/natsstore/go.work @@ -0,0 +1,6 @@ +go 1.24.1 + +use ( + . + ./test +) diff --git a/natsstore/kv.go b/natsstore/kv.go new file mode 100644 index 0000000..df0b8b2 --- /dev/null +++ b/natsstore/kv.go @@ -0,0 +1,156 @@ +package natsstore + +import ( + "context" + "errors" + "fmt" + "github.com/nats-io/nats.go/jetstream" +) + +func NewKeyValueStore(js jetstream.JetStream, bucket string, codec Codec) (Store, error) { + kv, err := js.KeyValue(context.Background(), bucket) + if err != nil { + return nil, err + } + + stream, err := js.Stream(context.Background(), fmt.Sprintf("KV_%s", bucket)) + if err != nil { + return nil, err + } + + return &KeyValueStore{ + stream: stream, + kv: kv, + codec: codec, + }, nil +} + +type KeyValueStore struct { + stream jetstream.Stream + kv jetstream.KeyValue + codec Codec +} + +func (s *KeyValueStore) Codec() Codec { + return s.codec +} + +func (s *KeyValueStore) List(ctx context.Context, filter string, handler EntryHandler) error { + watcher, err := s.kv.Watch(ctx, filter, jetstream.IgnoreDeletes()) + if err != nil { + return err + } + defer watcher.Stop() + + for v := range watcher.Updates() { + if v == nil { + break + } + + entry := newEntry(s.codec) + entry.Revision = v.Revision() + entry.Data = v.Value() + entry.Key = v.Key() + + if err := handler(entry, true); err != nil { + return err + } + } + + return handler(newEntry(s.codec), false) +} + +func (s *KeyValueStore) Apply(ctx context.Context, key string, mutator Mutator) (uint64, error) { + for { + ekve, err := s.kv.Get(ctx, key) + if err != nil { + if !errors.Is(err, jetstream.ErrKeyNotFound) { + return 0, err + } + + ekve = nil + } + + entry := newEntry(s.codec) + if ekve != nil { + entry.Key = ekve.Key() + entry.Data = ekve.Value() + entry.Revision = ekve.Revision() + } + + res, err := mutator(entry) + // in case of an error during execution of the mutator, we will not retry but return the error instead + if err != nil { + return 0, err + } + + // -- if the result is nul, we will delete the entry from the kv store + if res == nil && entry.Revision != 0 { + return entry.Revision, s.kv.Delete(ctx, key) + } + + // -- if we did get a result, we will need to persist the entry + // -- if the revision was 0, we will need to create a new entry + if entry.Revision == 0 { + rev, err := s.kv.Create(ctx, key, res) + if err != nil { + if errors.Is(err, jetstream.ErrKeyExists) { + continue + } + + return 0, err + } + return rev, nil + } else { + rev, err := s.kv.Update(ctx, key, res, entry.Revision) + if err != nil { + if errors.Is(err, jetstream.ErrKeyExists) { + continue + } + + return 0, err + } + return rev, nil + } + } +} + +func (s *KeyValueStore) Exists(ctx context.Context, key string) (bool, error) { + entry, err := s.kv.Get(ctx, key) + if err != nil { + if errors.Is(err, jetstream.ErrKeyNotFound) { + return false, nil + } + + return false, err + } + + return entry.Operation() == jetstream.KeyValuePut, nil +} + +func (s *KeyValueStore) Get(ctx context.Context, key string) (*Entry, bool, error) { + entry := newEntry(s.codec) + ekve, err := s.kv.Get(ctx, key) + if err != nil { + if errors.Is(err, jetstream.ErrKeyNotFound) { + return nil, false, nil + } + + return nil, false, err + } + + entry.Key = ekve.Key() + entry.Data = ekve.Value() + entry.Revision = ekve.Revision() + + return &entry, true, nil +} + +func (s *KeyValueStore) Delete(ctx context.Context, key string) error { + return s.kv.Delete(ctx, key) +} + +func (s *KeyValueStore) Purge(ctx context.Context, key string) error { + kvKey := fmt.Sprintf("$KV.%s.%s", s.kv.Bucket(), key) + return s.stream.Purge(ctx, jetstream.WithPurgeSubject(kvKey)) +} diff --git a/natsstore/store.go b/natsstore/store.go new file mode 100644 index 0000000..74f15c2 --- /dev/null +++ b/natsstore/store.go @@ -0,0 +1,41 @@ +package natsstore + +import ( + "context" +) + +// EntryHandler is a function that is called for each entry in the store that matches the filter. +// A handler must be able to process 'new' entries. These entries have not been stored yet, have a revision of 0 and +// no data attached to it. The hasMore parameter is true if there are more entries to process. +type EntryHandler func(entry Entry, hasMore bool) error + +// Mutator is a function that can mutate an entry. +// It returns the new data for the entry or an error if the mutation failed. If the data is nil, the +// entry will be deleted from the store. +type Mutator func(entry Entry) ([]byte, error) + +// Store represents a key-value store that can be used to store and retrieve entries. +type Store interface { + // List calls the handler for each entry in the store that matches the filter. + // it returns an error if the operation failed. + List(ctx context.Context, filter string, handler EntryHandler) error + + // Apply applies the mutator to the entry with the given key. + // If the entry doesn't exist, it will be created. If during the mutation the entry is modified by another + // process, the mutator will be reprocessed with the new entry. If the mutator returns nil, the entry will be + // deleted. + Apply(ctx context.Context, key string, mutator Mutator) (uint64, error) + + // Exists returns true if the entry with the given key exists in the store. + Exists(ctx context.Context, key string) (bool, error) + + // Get returns the entry with the given key. If the entry doesn't exist, the second return value is false. + Get(ctx context.Context, key string) (*Entry, bool, error) + + // Delete puts a delete marker on the given key. It will not be deleted but instead marked for deletion and being + // ignored by the List method. + Delete(ctx context.Context, key string) error + + // Purge deletes the key from the underlying stream. After doing a purge, the key will truly be deleted. + Purge(ctx context.Context, key string) error +} diff --git a/natsstore/test/go.mod b/natsstore/test/go.mod new file mode 100644 index 0000000..03c8de4 --- /dev/null +++ b/natsstore/test/go.mod @@ -0,0 +1,22 @@ +module github.com/synadia-io/orbit.go/natsstore/test + +go 1.24.1 + +require ( + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-task/slim-sprig/v3 v3.0.0 // indirect + github.com/google/go-tpm v0.9.3 // indirect + github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect + github.com/klauspost/compress v1.18.0 // indirect + github.com/minio/highwayhash v1.0.3 // indirect + github.com/nats-io/jwt/v2 v2.7.3 // indirect + github.com/nats-io/nats-server/v2 v2.11.0 // indirect + github.com/nats-io/nkeys v0.4.10 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/onsi/ginkgo/v2 v2.23.3 // indirect + go.uber.org/automaxprocs v1.6.0 // indirect + golang.org/x/crypto v0.36.0 // indirect + golang.org/x/sys v0.31.0 // indirect + golang.org/x/time v0.11.0 // indirect + golang.org/x/tools v0.30.0 // indirect +) diff --git a/natsstore/test/go.sum b/natsstore/test/go.sum new file mode 100644 index 0000000..c8ab925 --- /dev/null +++ b/natsstore/test/go.sum @@ -0,0 +1,33 @@ +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= +github.com/google/go-tpm v0.9.3 h1:+yx0/anQuGzi+ssRqeD6WpXjW2L/V0dItUayO0i9sRc= +github.com/google/go-tpm v0.9.3/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= +github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad h1:a6HEuzUHeKH6hwfN/ZoQgRgVIWFJljSWa/zetS2WTvg= +github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= +github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= +github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE= +github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4= +github.com/nats-io/nats-server/v2 v2.11.0 h1:fdwAT1d6DZW/4LUz5rkvQUe5leGEwjjOQYntzVRKvjE= +github.com/nats-io/nats-server/v2 v2.11.0/go.mod h1:leXySghbdtXSUmWem8K9McnJ6xbJOb0t9+NQ5HTRZjI= +github.com/nats-io/nkeys v0.4.10 h1:glmRrpCmYLHByYcePvnTBEAwawwapjCPMjy2huw20wc= +github.com/nats-io/nkeys v0.4.10/go.mod h1:OjRrnIKnWBFl+s4YK5ChQfvHP2fxqZexrKJoVVyWB3U= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/onsi/ginkgo/v2 v2.23.3 h1:edHxnszytJ4lD9D5Jjc4tiDkPBZ3siDeJJkUZJJVkp0= +github.com/onsi/ginkgo/v2 v2.23.3/go.mod h1:zXTP6xIp3U8aVuXN8ENK9IXRaTjFnpVB9mGmaSRvxnM= +go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= +go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= +golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= +golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0= +golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= +golang.org/x/tools v0.30.0 h1:BgcpHewrV5AUp2G9MebG4XPFI1E2W41zU1SaqVA9vJY= +golang.org/x/tools v0.30.0/go.mod h1:c347cR/OJfw5TI+GfX7RUPNMdDRRbjvYTS0jPyvsVtY= diff --git a/natsstore/test/kv_test.go b/natsstore/test/kv_test.go new file mode 100644 index 0000000..3cb7a15 --- /dev/null +++ b/natsstore/test/kv_test.go @@ -0,0 +1,272 @@ +package test + +import ( + "context" + "errors" + "github.com/nats-io/nats.go/jetstream" + "github.com/nats-io/nkeys" + "github.com/nats-io/nuid" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/synadia-io/orbit.go/natsstore" +) + +type data struct { + Value string +} + +func testTreeGenerator(codec natsstore.Codec) func() { + return func() { + var bucket string + var store natsstore.Store + + BeforeEach(func() { + bucket = nuid.Next() + + var err error + _, err = js.CreateKeyValue(context.Background(), jetstream.KeyValueConfig{ + Bucket: bucket, + }) + Expect(err).To(BeNil()) + + store, err = natsstore.NewKeyValueStore(js, bucket, codec) + Expect(err).ToNot(HaveOccurred()) + }) + + AfterEach(func() { + err := js.DeleteKeyValue(context.Background(), bucket) + Expect(err).ToNot(HaveOccurred()) + }) + + Describe("listing entries", func() { + Context("when there are no entries", func() { + It("should return an empty list", func() { + err := store.List(context.Background(), ">", func(entry natsstore.Entry, hasMore bool) error { + Expect(entry.IsNew()).To(BeTrue()) + Expect(hasMore).To(BeFalse()) + return nil + }) + Expect(err).ToNot(HaveOccurred()) + }) + }) + + Context("when there are entries", func() { + BeforeEach(func() { + _, err := store.Apply(context.Background(), "meta.store.user-a.entry1", func(entry natsstore.Entry) ([]byte, error) { + return []byte("data1"), nil + }) + Expect(err).ToNot(HaveOccurred()) + + _, err = store.Apply(context.Background(), "meta.store.user-b.entry2", func(entry natsstore.Entry) ([]byte, error) { + return []byte("data2"), nil + }) + Expect(err).ToNot(HaveOccurred()) + }) + + It("should error if no filter is provided", func() { + err := store.List(context.Background(), "", func(entry natsstore.Entry, hasMore bool) error { + Fail("Should not have called the handler") + return nil + }) + Expect(err).To(HaveOccurred()) + }) + It("should return entries matching the filter", func() { + resultCount := 0 + err := store.List(context.Background(), "meta.store.user-b.*", func(entry natsstore.Entry, hasMore bool) error { + if !entry.IsNew() { + resultCount++ + } + + return nil + }) + Expect(err).ToNot(HaveOccurred()) + Expect(resultCount).To(Equal(1)) + }) + }) + }) + Describe("checking if an entry exists", func() { + When("the entry does not exist", func() { + It("should return false", func() { + fnd, err := store.Exists(context.Background(), "meta.store.user-a.unknown") + Expect(err).ToNot(HaveOccurred()) + Expect(fnd).To(BeFalse()) + }) + }) + When("the entry exists", func() { + BeforeEach(func(ctx context.Context) { + _, err := store.Apply(ctx, "meta.store.user-a.entry1", func(entry natsstore.Entry) ([]byte, error) { + return []byte("data1"), nil + }) + Expect(err).ToNot(HaveOccurred()) + }) + AfterEach(func(ctx context.Context) { + var keys []string + err := store.List(context.Background(), "meta.store.*.*", func(entry natsstore.Entry, hasMore bool) error { + keys = append(keys, entry.Key) + return nil + }) + Expect(err).ToNot(HaveOccurred()) + + for _, key := range keys { + err = store.Purge(ctx, key) + Expect(err).ToNot(HaveOccurred()) + } + }) + + It("should return true", func() { + fnd, err := store.Exists(context.Background(), "meta.store.user-a.entry1") + Expect(err).ToNot(HaveOccurred()) + Expect(fnd).To(BeTrue()) + }) + }) + }) + Describe("applying a mutation", func() { + When("the mutator results in an error", func() { + It("should return an error", func() { + _, err := store.Apply(context.Background(), "meta.store.user-a.entry1", func(entry natsstore.Entry) ([]byte, error) { + return nil, errors.New("error") + }) + Expect(err).To(HaveOccurred()) + }) + }) + + When("the entry does not exist", func() { + It("should create the entry", func() { + _, err := store.Apply(context.Background(), "meta.store.user-a.unknown", func(entry natsstore.Entry) ([]byte, error) { + if !entry.IsNew() { + Fail("Entry should be new") + } + return nil, nil + }) + Expect(err).ToNot(HaveOccurred()) + }) + }) + + When("the entry exists", func() { + BeforeEach(func() { + _, err := store.Apply(context.Background(), "meta.store.user-a.entry1", func(entry natsstore.Entry) ([]byte, error) { + return entry.Encode(data{Value: "data1"}) + }) + Expect(err).ToNot(HaveOccurred()) + }) + + It("should update the entry", func() { + _, err := store.Apply(context.Background(), "meta.store.user-a.entry1", func(entry natsstore.Entry) ([]byte, error) { + return entry.Encode(data{Value: "data2"}) + }) + Expect(err).ToNot(HaveOccurred()) + + var entry *natsstore.Entry + entry, _, err = store.Get(context.Background(), "meta.store.user-a.entry1") + + var d data + _ = entry.Decode(&d) + + Expect(err).ToNot(HaveOccurred()) + Expect(d.Value).To(Equal("data2")) + }) + }) + }) + Describe("retrieving an entry", func() { + When("the entry does not exist", func() { + It("should return nil, false, and no error", func() { + entry, fnd, err := store.Get(context.Background(), "meta.store.user-a.unknown") + + Expect(err).NotTo(HaveOccurred()) + Expect(entry).To(BeNil()) + Expect(fnd).To(BeFalse()) + }) + }) + When("the entry exists", func() { + BeforeEach(func() { + _, err := store.Apply(context.Background(), "meta.store.user-a.entry1", func(entry natsstore.Entry) ([]byte, error) { + return entry.Encode(data{Value: "data1"}) + }) + Expect(err).ToNot(HaveOccurred()) + }) + + It("should return the entry", func() { + entry, fnd, err := store.Get(context.Background(), "meta.store.user-a.entry1") + var d data + _ = entry.Decode(&d) + + Expect(err).NotTo(HaveOccurred()) + Expect(entry.Revision).ToNot(Equal(uint64(0))) + Expect(entry.Key).To(Equal("meta.store.user-a.entry1")) + Expect(fnd).To(BeTrue()) + Expect(d.Value).To(Equal("data1")) + }) + }) + }) + Describe("removing an entry", func() { + When("the entry does not exist", func() { + It("should return no error", func() { + err := store.Delete(context.Background(), "meta.store.user-a.unknown") + Expect(err).NotTo(HaveOccurred()) + }) + }) + When("the entry exists", func() { + BeforeEach(func() { + _, err := store.Apply(context.Background(), "meta.store.user-a.entry1", func(entry natsstore.Entry) ([]byte, error) { + return entry.Encode(data{Value: "data1"}) + }) + Expect(err).ToNot(HaveOccurred()) + }) + + It("should remove the entry", func() { + err := store.Delete(context.Background(), "meta.store.user-a.entry1") + Expect(err).NotTo(HaveOccurred()) + + _, fnd, err := store.Get(context.Background(), "meta.store.user-a.entry1") + Expect(err).NotTo(HaveOccurred()) + Expect(fnd).To(BeFalse()) + }) + }) + }) + Describe("purging an entry", func() { + When("the entry does not exist", func() { + It("should return no error", func() { + err := store.Purge(context.Background(), "meta.store.user-a.unknown") + Expect(err).NotTo(HaveOccurred()) + }) + }) + When("the entry exists", func() { + BeforeEach(func(ctx context.Context) { + _, err := store.Apply(context.Background(), "meta.store.user-a.entry1", func(entry natsstore.Entry) ([]byte, error) { + return entry.Encode(data{Value: "data1"}) + }) + Expect(err).ToNot(HaveOccurred()) + }) + AfterEach(func(ctx context.Context) { + var keys []string + err := store.List(context.Background(), "meta.store.*.*", func(entry natsstore.Entry, hasMore bool) error { + keys = append(keys, entry.Key) + return nil + }) + Expect(err).ToNot(HaveOccurred()) + + for _, key := range keys { + err = store.Purge(ctx, key) + Expect(err).ToNot(HaveOccurred()) + } + }) + + It("should remove the entry", func() { + err := store.Purge(context.Background(), "meta.store.user-a.entry1") + Expect(err).NotTo(HaveOccurred()) + + fnd, err := store.Exists(context.Background(), "meta.store.user-a.entry1") + Expect(err).NotTo(HaveOccurred()) + Expect(fnd).To(BeFalse()) + }) + }) + }) + } +} + +var _ = Describe("KeyValue", func() { + var xkey, _ = nkeys.CreateCurveKeys() + + Context("using the json codec", testTreeGenerator(natsstore.NewJsonCodec())) + Context("using a secured json codec", testTreeGenerator(natsstore.Secured(natsstore.NewJsonCodec(), xkey))) +}) diff --git a/natsstore/test/store_suite_test.go b/natsstore/test/store_suite_test.go new file mode 100644 index 0000000..a84554a --- /dev/null +++ b/natsstore/test/store_suite_test.go @@ -0,0 +1,53 @@ +package test + +import ( + "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats-server/v2/test" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "os" + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var jsDir string +var srv *server.Server +var nc *nats.Conn +var js jetstream.JetStream + +func TestStorage(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Storage Suite") +} + +var _ = BeforeSuite(func() { + var err error + + jsDir := os.TempDir() + + opts := test.DefaultTestOptions + opts.Port = -1 + opts.JetStream = true + opts.StoreDir = jsDir + + srv := test.RunServer(&opts) + nc, err := nats.Connect(srv.ClientURL()) + Expect(err).NotTo(HaveOccurred()) + + js, err = jetstream.New(nc) + Expect(err).ToNot(HaveOccurred()) +}) + +var _ = AfterSuite(func() { + defer os.RemoveAll(jsDir) + + if nc != nil { + nc.Close() + } + + if srv != nil { + srv.Shutdown() + } +}) diff --git a/natssysclient/go.work.sum b/natssysclient/go.work.sum index 1004e96..3e9eeaa 100644 --- a/natssysclient/go.work.sum +++ b/natssysclient/go.work.sum @@ -1,16 +1,18 @@ github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= -github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE= -github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4= +go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= +golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q= golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=