diff --git a/states/kv/kv.go b/states/kv/kv.go index f6998ff..657a769 100644 --- a/states/kv/kv.go +++ b/states/kv/kv.go @@ -8,7 +8,6 @@ import ( "encoding/json" "fmt" "math" - "path" "strings" "time" @@ -74,7 +73,7 @@ func NewEtcdKV(client *clientv3.Client) *etcdKV { // Load returns value of the key. func (kv *etcdKV) Load(ctx context.Context, key string) (string, error) { - key = path.Join(kv.rootPath, key) + key = joinPath(kv.rootPath, key) resp, err := kv.client.Get(ctx, key) if err != nil { return "", err @@ -87,7 +86,7 @@ func (kv *etcdKV) Load(ctx context.Context, key string) (string, error) { // LoadWithPrefix returns all the keys and values with the given key prefix. func (kv *etcdKV) LoadWithPrefix(ctx context.Context, key string) ([]string, []string, error) { - key = path.Join(kv.rootPath, key) + key = joinPath(kv.rootPath, key) resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) if err != nil { @@ -104,27 +103,27 @@ func (kv *etcdKV) LoadWithPrefix(ctx context.Context, key string) ([]string, []s // Save saves the key-value pair. func (kv *etcdKV) Save(ctx context.Context, key, value string) error { - key = path.Join(kv.rootPath, key) + key = joinPath(kv.rootPath, key) _, err := kv.client.Put(ctx, key, value) return err } // Remove removes the key. func (kv *etcdKV) Remove(ctx context.Context, key string) error { - key = path.Join(kv.rootPath, key) + key = joinPath(kv.rootPath, key) _, err := kv.client.Delete(ctx, key) return err } // RemoveWithPrefix removes the keys with given prefix. func (kv *etcdKV) RemoveWithPrefix(ctx context.Context, prefix string) error { - key := path.Join(kv.rootPath, prefix) + key := joinPath(kv.rootPath, prefix) _, err := kv.client.Delete(ctx, key, clientv3.WithPrefix()) return err } func (kv *etcdKV) removeWithPrevKV(ctx context.Context, key string) (*mvccpb.KeyValue, error) { - key = path.Join(kv.rootPath, key) + key = joinPath(kv.rootPath, key) resp, err := kv.client.Delete(ctx, key, clientv3.WithPrevKV()) if err != nil { return nil, err @@ -136,7 +135,7 @@ func (kv *etcdKV) removeWithPrevKV(ctx context.Context, key string) (*mvccpb.Key } func (kv *etcdKV) removeWithPrefixAndPrevKV(ctx context.Context, prefix string) ([]*mvccpb.KeyValue, error) { - key := path.Join(kv.rootPath, prefix) + key := joinPath(kv.rootPath, prefix) resp, err := kv.client.Delete(ctx, key, clientv3.WithPrefix(), clientv3.WithPrevKV()) return resp.PrevKvs, err } @@ -180,7 +179,7 @@ func writeBackupBytes(w *bufio.Writer, data []byte) { func (kv *etcdKV) BackupKV(base, prefix string, w *bufio.Writer, ignoreRevision bool, batchSize int64) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() - resp, err := kv.client.Get(ctx, path.Join(base, prefix), clientv3.WithCountOnly(), clientv3.WithPrefix()) + resp, err := kv.client.Get(ctx, joinPath(base, prefix), clientv3.WithCountOnly(), clientv3.WithPrefix()) if err != nil { return err } @@ -200,7 +199,7 @@ func (kv *etcdKV) BackupKV(base, prefix string, w *bufio.Writer, ignoreRevision parts := strings.Split(base, "/") if len(parts) > 1 { metaPath = parts[len(parts)-1] - instance = path.Join(parts[:len(parts)-1]...) + instance = joinPath(parts[:len(parts)-1]...) } else { instance = base } @@ -230,7 +229,7 @@ func (kv *etcdKV) BackupKV(base, prefix string, w *bufio.Writer, ignoreRevision options = append(options, clientv3.WithRev(rev)) } - currentKey := path.Join(base, prefix) + currentKey := joinPath(base, prefix) var i int prefixBS := []byte(currentKey) for int64(i) < cnt { @@ -323,7 +322,7 @@ func NewTiKV(txn *txnkv.Client) *txnTiKV { // Load returns value of the key. func (kv *txnTiKV) Load(ctx context.Context, key string) (string, error) { - key = path.Join(kv.rootPath, key) + key = joinPath(kv.rootPath, key) ss := kv.client.GetSnapshot(MaxSnapshotTS) ss.SetScanBatchSize(SnapshotScanSize) @@ -341,7 +340,7 @@ func (kv *txnTiKV) Load(ctx context.Context, key string) (string, error) { // LoadWithPrefix returns all the keys and values for the given key prefix. func (kv *txnTiKV) LoadWithPrefix(ctx context.Context, prefix string) ([]string, []string, error) { - prefix = path.Join(kv.rootPath, prefix) + prefix = joinPath(kv.rootPath, prefix) ss := kv.client.GetSnapshot(MaxSnapshotTS) ss.SetScanBatchSize(SnapshotScanSize) @@ -377,7 +376,7 @@ func (kv *txnTiKV) LoadWithPrefix(ctx context.Context, prefix string) ([]string, // Save saves the input key-value pair. func (kv *txnTiKV) Save(ctx context.Context, key, value string) error { - key = path.Join(kv.rootPath, key) + key = joinPath(kv.rootPath, key) txn, err := kv.client.Begin() if err != nil { @@ -398,7 +397,7 @@ func (kv *txnTiKV) Save(ctx context.Context, key, value string) error { // Remove removes the input key. func (kv *txnTiKV) Remove(ctx context.Context, key string) error { - key = path.Join(kv.rootPath, key) + key = joinPath(kv.rootPath, key) txn, err := kv.client.Begin() if err != nil { @@ -414,7 +413,7 @@ func (kv *txnTiKV) Remove(ctx context.Context, key string) error { // RemoveWithPrefix removes the keys for the given prefix. func (kv *txnTiKV) RemoveWithPrefix(ctx context.Context, prefix string) error { - prefix = path.Join(kv.rootPath, prefix) + prefix = joinPath(kv.rootPath, prefix) ctx, cancel := context.WithTimeout(ctx, RequestTimeout) defer cancel() @@ -502,7 +501,7 @@ func (kv *txnTiKV) BackupKV(base, prefix string, w *bufio.Writer, ignoreRevision ss := txn.GetSnapshot() ss.SetScanBatchSize(SnapshotScanSize) - keyprefix := path.Join(base, prefix) + keyprefix := joinPath(base, prefix) startKey := []byte(keyprefix) endKey := tikv.PrefixNextKey([]byte(keyprefix)) iter, err := ss.Iter(startKey, endKey) @@ -525,7 +524,7 @@ func (kv *txnTiKV) BackupKV(base, prefix string, w *bufio.Writer, ignoreRevision parts := strings.Split(base, "/") if len(parts) > 1 { metaPath = parts[len(parts)-1] - instance = path.Join(parts[:len(parts)-1]...) + instance = joinPath(parts[:len(parts)-1]...) } else { instance = base } diff --git a/states/kv/utils.go b/states/kv/utils.go new file mode 100644 index 0000000..7e035cb --- /dev/null +++ b/states/kv/utils.go @@ -0,0 +1,14 @@ +package kv + +import ( + "path" + "strings" +) + +func joinPath(parts ...string) string { + r := path.Join(parts...) + if len(parts) > 0 && strings.HasSuffix(parts[len(parts)-1], "/") { + r += "/" + } + return r +}