cache: use badgerdb (#15)
This commit is contained in:
55
cache/cache.go
vendored
55
cache/cache.go
vendored
@@ -1,42 +1,75 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/dgraph-io/badger/v3"
|
||||
"github.com/l3uddz/nabarr/logger"
|
||||
"github.com/lefelys/state"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/xujiajun/nutsdb"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
log zerolog.Logger
|
||||
st state.State
|
||||
|
||||
db *nutsdb.DB
|
||||
db *badger.DB
|
||||
}
|
||||
|
||||
func New(path string) (*Client, error) {
|
||||
db, err := nutsdb.Open(nutsdb.Options{
|
||||
Dir: path,
|
||||
EntryIdxMode: nutsdb.HintKeyValAndRAMIdxMode,
|
||||
SegmentSize: 8 * 1024 * 1024,
|
||||
NodeNum: 1,
|
||||
RWMode: nutsdb.FileIO,
|
||||
SyncEnable: true,
|
||||
StartFileLoadingMode: nutsdb.MMap,
|
||||
})
|
||||
opts := badger.DefaultOptions(path)
|
||||
opts.Logger = nil
|
||||
db, err := badger.Open(opts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open: %w", err)
|
||||
}
|
||||
|
||||
log := logger.New("trace").With().Logger()
|
||||
|
||||
// start cleaner
|
||||
st, tail := state.WithShutdown()
|
||||
ticker := time.NewTicker(6 * time.Hour)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-tail.End():
|
||||
ticker.Stop()
|
||||
tail.Done()
|
||||
return
|
||||
case <-ticker.C:
|
||||
// clean cache
|
||||
for {
|
||||
if db.RunValueLogGC(0.5) == nil {
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
log.Debug().Msg("Cleaned cache")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return &Client{
|
||||
log: log,
|
||||
st: st,
|
||||
|
||||
db: db,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *Client) Close() error {
|
||||
// shutdown cleaner
|
||||
if c.st != nil {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
||||
defer cancel()
|
||||
if err := c.st.Shutdown(ctx); err != nil {
|
||||
c.log.Error().
|
||||
Err(err).
|
||||
Msg("Failed shutting down cache cleaner gracefully")
|
||||
}
|
||||
}
|
||||
|
||||
// close cache
|
||||
return c.db.Close()
|
||||
}
|
||||
|
||||
6
cache/delete.go
vendored
6
cache/delete.go
vendored
@@ -2,12 +2,12 @@ package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/xujiajun/nutsdb"
|
||||
"github.com/dgraph-io/badger/v3"
|
||||
)
|
||||
|
||||
func (c *Client) Delete(bucket string, key string) error {
|
||||
if err := c.db.Update(func(tx *nutsdb.Tx) error {
|
||||
return tx.Delete(bucket, []byte(key))
|
||||
if err := c.db.Update(func(txn *badger.Txn) error {
|
||||
return txn.Delete([]byte(fmt.Sprintf("%s_%v", bucket, key)))
|
||||
}); err != nil {
|
||||
return fmt.Errorf("%v: %v; delete: %w", bucket, key, err)
|
||||
}
|
||||
|
||||
2
cache/delete_test.go
vendored
2
cache/delete_test.go
vendored
@@ -35,7 +35,7 @@ func TestClient_Delete(t *testing.T) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
c := &Client{
|
||||
log: tt.fields.log,
|
||||
db: newDb(t, "delete"),
|
||||
db: newDb(t),
|
||||
}
|
||||
if err := c.Delete(tt.args.bucket, tt.args.key); (err != nil) != tt.wantErr {
|
||||
t.Errorf("Delete() error = %v, wantErr %v", err, tt.wantErr)
|
||||
|
||||
27
cache/get.go
vendored
27
cache/get.go
vendored
@@ -2,17 +2,34 @@ package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/xujiajun/nutsdb"
|
||||
"github.com/dgraph-io/badger/v3"
|
||||
)
|
||||
|
||||
func (c *Client) Get(bucket string, key string) ([]byte, error) {
|
||||
var value []byte
|
||||
if err := c.db.View(func(tx *nutsdb.Tx) error {
|
||||
e, err := tx.Get(bucket, []byte(key))
|
||||
|
||||
if err := c.db.View(func(txn *badger.Txn) error {
|
||||
// get key
|
||||
v, err := txn.Get([]byte(fmt.Sprintf("%s_%s", bucket, key)))
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v: %v; get: %w", bucket, key, err)
|
||||
return fmt.Errorf("%v: %v: get error: %w", bucket, key, err)
|
||||
|
||||
}
|
||||
value = e.Value
|
||||
|
||||
// validate value
|
||||
if v.IsDeletedOrExpired() {
|
||||
return fmt.Errorf("%v: %v: get: key does not exist", bucket, key)
|
||||
}
|
||||
|
||||
// read value
|
||||
err = v.Value(func(val []byte) error {
|
||||
value = val
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v: %v: get: value error: %w", bucket, key, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
|
||||
17
cache/get_test.go
vendored
17
cache/get_test.go
vendored
@@ -38,7 +38,7 @@ func TestClient_Get(t *testing.T) {
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "with value",
|
||||
name: "with value ttl",
|
||||
fields: fields{
|
||||
log: zerolog.Logger{},
|
||||
},
|
||||
@@ -52,25 +52,12 @@ func TestClient_Get(t *testing.T) {
|
||||
want: []byte("test"),
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "no value post ttl",
|
||||
fields: fields{
|
||||
log: zerolog.Logger{},
|
||||
},
|
||||
args: args{
|
||||
bucket: "get",
|
||||
key: "test",
|
||||
},
|
||||
sleep: 1 * time.Second,
|
||||
want: nil,
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
c := &Client{
|
||||
log: tt.fields.log,
|
||||
db: newDb(t, "get"),
|
||||
db: newDb(t),
|
||||
}
|
||||
|
||||
if tt.put {
|
||||
|
||||
10
cache/put.go
vendored
10
cache/put.go
vendored
@@ -2,13 +2,17 @@ package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/xujiajun/nutsdb"
|
||||
"github.com/dgraph-io/badger/v3"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (c *Client) Put(bucket string, key string, val []byte, ttl time.Duration) error {
|
||||
if err := c.db.Update(func(tx *nutsdb.Tx) error {
|
||||
return tx.Put(bucket, []byte(key), val, uint32(ttl.Seconds()))
|
||||
if err := c.db.Update(func(txn *badger.Txn) error {
|
||||
e := badger.NewEntry([]byte(fmt.Sprintf("%s_%s", bucket, key)), val)
|
||||
if ttl > 0 {
|
||||
e = e.WithTTL(ttl)
|
||||
}
|
||||
return txn.SetEntry(e)
|
||||
}); err != nil {
|
||||
return fmt.Errorf("%v: %v; put: %w", bucket, key, err)
|
||||
}
|
||||
|
||||
9
cache/put_test.go
vendored
9
cache/put_test.go
vendored
@@ -34,7 +34,7 @@ func TestClient_Put(t *testing.T) {
|
||||
bucket: "put",
|
||||
key: "test",
|
||||
val: []byte("testing"),
|
||||
ttl: 50 * time.Millisecond,
|
||||
ttl: 5 * time.Second,
|
||||
},
|
||||
want: []byte("testing"),
|
||||
wantErr: false,
|
||||
@@ -48,9 +48,9 @@ func TestClient_Put(t *testing.T) {
|
||||
bucket: "put",
|
||||
key: "test",
|
||||
val: []byte("testing"),
|
||||
ttl: 1 * time.Second,
|
||||
ttl: 500 * time.Millisecond,
|
||||
},
|
||||
sleep: 2 * time.Second,
|
||||
sleep: 1 * time.Second,
|
||||
want: nil,
|
||||
wantErr: true,
|
||||
},
|
||||
@@ -65,6 +65,7 @@ func TestClient_Put(t *testing.T) {
|
||||
val: []byte("testing"),
|
||||
ttl: 0,
|
||||
},
|
||||
sleep: 1 * time.Second,
|
||||
want: []byte("testing"),
|
||||
wantErr: false,
|
||||
},
|
||||
@@ -73,7 +74,7 @@ func TestClient_Put(t *testing.T) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
c := &Client{
|
||||
log: tt.fields.log,
|
||||
db: newDb(t, "nabarr_put"),
|
||||
db: newDb(t),
|
||||
}
|
||||
|
||||
if err := c.Put(tt.args.bucket, tt.args.key, tt.args.val, tt.args.ttl); (err != nil) != tt.wantErr && tt.sleep == 0 {
|
||||
|
||||
20
cache/test.go
vendored
20
cache/test.go
vendored
@@ -1,24 +1,16 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"github.com/xujiajun/nutsdb"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"github.com/dgraph-io/badger/v3"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func newDb(t *testing.T, dir string) *nutsdb.DB {
|
||||
db, err := nutsdb.Open(nutsdb.Options{
|
||||
Dir: filepath.Join(os.TempDir(), dir),
|
||||
EntryIdxMode: nutsdb.HintKeyValAndRAMIdxMode,
|
||||
SegmentSize: 8 * 1024 * 1024,
|
||||
NodeNum: 1,
|
||||
RWMode: nutsdb.FileIO,
|
||||
SyncEnable: true,
|
||||
StartFileLoadingMode: nutsdb.MMap,
|
||||
})
|
||||
func newDb(t *testing.T) *badger.DB {
|
||||
opts := badger.DefaultOptions("").WithInMemory(true)
|
||||
opts.Logger = nil
|
||||
db, err := badger.Open(opts)
|
||||
if err != nil {
|
||||
t.Fatalf("newDb(dir: %v) open error: %v", dir, err)
|
||||
t.Fatalf("newDb() open error: %v", err)
|
||||
}
|
||||
return db
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user