Skip to content

Commit

Permalink
ec: refactor and simplify; docs: remove all gitlab references
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Dec 8, 2023
1 parent 708ed1a commit 7575d93
Show file tree
Hide file tree
Showing 15 changed files with 171 additions and 167 deletions.
8 changes: 4 additions & 4 deletions docs/_posts/2023-04-03-transform-images-with-python-sdk.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ In a previous series of posts, we transformed the ImageNet dataset using a mixtu

As we did in the posts above, we'll assume that an instance of AIStore has been already deployed on Kubernetes. All the code below will expect an `AIS_ENDPOINT` environment variable set to the cluster's endpoint.

> To set up a local Kubernetes cluster with Minikube, checkout the [docs here](https://gitlab-master.nvidia.com/aistorage/aistore/-/blob/master/deploy/dev/k8s/README.md). For more advanced deployments, take a look at our dedicated [ais-k8s repository](https://github.com/NVIDIA/ais-k8s/).
> To set up a local Kubernetes cluster with Minikube, checkout the [docs here](https://github.com/NVIDIA/aistore/tree/master/deploy/dev/k8s). For more advanced deployments, take a look at our dedicated [ais-k8s repository](https://github.com/NVIDIA/ais-k8s/).
We'll be using PyTorch's `torchvision` to transform [The Oxford-IIIT Pet Dataset](https://www.robots.ox.ac.uk/~vgg/data/pets/) - as illustrated:

![AIS-ETL Overview](/assets/ais_etl_series/ais-etl-overview.png)

To interact with the cluster, we'll be using the [AIS Python SDK](https://gitlab-master.nvidia.com/aistorage/aistore/-/tree/master/python). Set up your Python environment and install the following requirements:
To interact with the cluster, we'll be using the [AIS Python SDK](https://github.com/NVIDIA/aistore/tree/master/python). Set up your Python environment and install the following requirements:

```text
aistore
Expand All @@ -33,7 +33,7 @@ torch

## The Dataset

For this demo we will be using the [Oxford-IIIT Pet Dataset](https://www.robots.ox.ac.uk/~vgg/data/pets/) since it is less than 1GB. The [ImageNet Dataset](https://image-net.org/index.php) is another reasonable choice, but consists of much larger downloads.
For this demo we will be using the [Oxford-IIIT Pet Dataset](https://www.robots.ox.ac.uk/~vgg/data/pets/) since it is less than 1GB. The [ImageNet Dataset](https://image-net.org/index.php) is another reasonable choice, but consists of much larger downloads.

Once downloaded, the dataset includes an `images` and an `annotations` folder. For this example we will focus on the `images` directory, which consists of different sized `.jpg` images.

Expand Down Expand Up @@ -213,7 +213,7 @@ Full code examples for each action above can be found [here](/examples/transform
1. [AIStore & ETL: Introduction](https://aiatscale.org/blog/2021/10/21/ais-etl-1)
2. GitHub:
- [AIStore](https://github.com/NVIDIA/aistore)
- [Local Kubernetes Deployment](https://gitlab-master.nvidia.com/aistorage/aistore/-/blob/master/deploy/dev/k8s/README.md)
- [Local Kubernetes Deployment](https://github.com/NVIDIA/aistore/blob/master/deploy/dev/k8s/README.md)
- [AIS/Kubernetes Operator, AIS on bare-metal, Deployment Playbooks, Helm](https://github.com/NVIDIA/ais-k8s)
- [AIS-ETL containers and specs](https://github.com/NVIDIA/ais-etl)
3. Documentation, blogs, videos:
Expand Down
22 changes: 11 additions & 11 deletions docs/_posts/2023-05-08-aisio-transforms-with-webdataset-pt-1.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,26 +151,26 @@ print([entry.name for entry in objects])
```
Output:
```
shuffled-shard-0.tar 102.36MiB
shuffled-shard-1.tar 102.44MiB
shuffled-shard-2.tar 102.40MiB
shuffled-shard-3.tar 102.45MiB
shuffled-shard-4.tar 102.36MiB
shuffled-shard-5.tar 102.40MiB
shuffled-shard-6.tar 102.49MiB
shuffled-shard-7.tar 74.84MiB
shuffled-shard-0.tar 102.36MiB
shuffled-shard-1.tar 102.44MiB
shuffled-shard-2.tar 102.40MiB
shuffled-shard-3.tar 102.45MiB
shuffled-shard-4.tar 102.36MiB
shuffled-shard-5.tar 102.40MiB
shuffled-shard-6.tar 102.49MiB
shuffled-shard-7.tar 74.84MiB
```

Finally we have our result: WebDataset-formatted, shuffled shards stored in AIS and ready for use!

In future posts, we'll show how to run transformations on this data and load it for model training.
In future posts, we'll show how to run transformations on this data and load it for model training.

---
---
## References

1. GitHub:
- [AIStore](https://github.com/NVIDIA/aistore)
- [Local Kubernetes Deployment](https://gitlab-master.nvidia.com/aistorage/aistore/-/blob/master/deploy/dev/k8s/README.md)
- [Local Kubernetes Deployment](https://github.com/NVIDIA/aistore/blob/master/deploy/dev/k8s/README.md)
- [AIS/Kubernetes Operator, AIS on bare-metal, Deployment Playbooks, Helm](https://github.com/NVIDIA/ais-k8s)
- [WebDataset Library](https://github.com/webdataset/webdataset)
2. Documentation, blogs, videos:
Expand Down
11 changes: 5 additions & 6 deletions ec/bckencodexact.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ type (
}
XactBckEncode struct {
xact.Base
t cluster.Target
bck *meta.Bck
wg *sync.WaitGroup // to wait for EC finishes all objects
smap *meta.Smap
Expand All @@ -53,7 +52,7 @@ func (*encFactory) New(args xreg.Args, bck *meta.Bck) xreg.Renewable {
}

func (p *encFactory) Start() error {
p.xctn = newXactBckEncode(p.Bck, p.T, p.UUID())
p.xctn = newXactBckEncode(p.Bck, p.UUID())
return nil
}

Expand All @@ -75,16 +74,16 @@ func (p *encFactory) WhenPrevIsRunning(prevEntry xreg.Renewable) (wpr xreg.WPR,
// XactBckEncode //
///////////////////

func newXactBckEncode(bck *meta.Bck, t cluster.Target, uuid string) (r *XactBckEncode) {
r = &XactBckEncode{t: t, bck: bck, wg: &sync.WaitGroup{}, smap: t.Sowner().Get()}
func newXactBckEncode(bck *meta.Bck, uuid string) (r *XactBckEncode) {
r = &XactBckEncode{bck: bck, wg: &sync.WaitGroup{}, smap: g.t.Sowner().Get()}
r.InitBase(uuid, apc.ActECEncode, bck)
return
}

func (r *XactBckEncode) Run(wg *sync.WaitGroup) {
wg.Done()
bck := r.bck
if err := bck.Init(r.t.Bowner()); err != nil {
if err := bck.Init(g.t.Bowner()); err != nil {
r.AddErr(err)
r.Finish()
return
Expand All @@ -96,7 +95,7 @@ func (r *XactBckEncode) Run(wg *sync.WaitGroup) {
}

opts := &mpather.JgroupOpts{
T: r.t,
T: g.t,
CTs: []string{fs.ObjectType},
VisitObj: r.bckEncode,
DoLoad: mpather.LoadUnsafe,
Expand Down
98 changes: 56 additions & 42 deletions ec/ec.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,31 +179,41 @@ type (
}
)

var (
emptyReq request
type global struct {
t cluster.Target
reqPool sync.Pool
mm *memsys.MMSA // memory manager and slab/SGL allocator
emptyReq request
}

var g global

var (
ErrorECDisabled = errors.New("EC is disabled for bucket")
ErrorNoMetafile = errors.New("no metafile")
ErrorNotFound = errors.New("not found")
)

func allocateReq(action string, lif cluster.LIF) (req *request) {
if v := reqPool.Get(); v != nil {
req = v.(*request)
} else {
req = &request{}
func Init(t cluster.Target) {
g.t = t
g.mm = t.PageMM()

fs.CSM.Reg(fs.ECSliceType, &fs.ECSliceContentResolver{})
fs.CSM.Reg(fs.ECMetaType, &fs.ECMetaContentResolver{})

xreg.RegBckXact(&getFactory{})
xreg.RegBckXact(&putFactory{})
xreg.RegBckXact(&rspFactory{})
xreg.RegBckXact(&encFactory{})

if err := initManager(); err != nil {
cos.ExitLogf("Failed to init manager: %v", err)
}
req.Action = action
req.LIF = lif
return
}

func freeReq(req *request) {
*req = emptyReq
reqPool.Put(req)
}
///////////
// slice //
///////////

// Free allocated memory and removes slice's temporary file
func (s *slice) free() {
Expand Down Expand Up @@ -271,20 +281,24 @@ func (s *slice) reopenReader() (reader cos.ReadOpenCloser, err error) {
return reader, err
}

func Init(t cluster.Target) {
mm = t.PageMM()

fs.CSM.Reg(fs.ECSliceType, &fs.ECSliceContentResolver{})
fs.CSM.Reg(fs.ECMetaType, &fs.ECMetaContentResolver{})

xreg.RegBckXact(&getFactory{})
xreg.RegBckXact(&putFactory{})
xreg.RegBckXact(&rspFactory{})
xreg.RegBckXact(&encFactory{})
//
// misc. utils
//

if err := initManager(t); err != nil {
cos.ExitLogf("Failed to init manager: %v", err)
func allocateReq(action string, lif cluster.LIF) (req *request) {
if v := g.reqPool.Get(); v != nil {
req = v.(*request)
} else {
req = &request{}
}
req.Action = action
req.LIF = lif
return
}

func freeReq(req *request) {
*req = g.emptyReq
g.reqPool.Put(req)
}

// SliceSize returns the size of one slice that EC will create for the object
Expand All @@ -310,7 +324,7 @@ func useDisk(objSize int64, config *cmn.Config) bool {
if config.EC.DiskOnly {
return true
}
memPressure := mm.Pressure()
memPressure := g.mm.Pressure()
switch memPressure {
case memsys.OOM, memsys.PressureExtreme:
return true
Expand Down Expand Up @@ -379,7 +393,7 @@ func RequestECMeta(bck *cmn.Bck, objName string, si *meta.Snode, client *http.Cl
}

// Saves the main replica to local drives
func writeObject(t cluster.Target, lom *cluster.LOM, reader io.Reader, size int64, xctn cluster.Xact) error {
func writeObject(lom *cluster.LOM, reader io.Reader, size int64, xctn cluster.Xact) error {
if size > 0 {
reader = io.LimitReader(reader, size)
}
Expand All @@ -394,26 +408,26 @@ func writeObject(t cluster.Target, lom *cluster.LOM, reader io.Reader, size int6
// to avoid changing version; TODO: introduce cmn.OwtEC
params.OWT = cmn.OwtMigrate
}
err := t.PutObject(lom, params)
err := g.t.PutObject(lom, params)
cluster.FreePutObjParams(params)
return err
}

func validateBckBID(t cluster.Target, bck *cmn.Bck, bid uint64) error {
func validateBckBID(bck *cmn.Bck, bid uint64) error {
if bid == 0 {
return nil
}
newBck := meta.CloneBck(bck)
err := newBck.Init(t.Bowner())
err := newBck.Init(g.t.Bowner())
if err == nil && newBck.Props.BID != bid {
err = fmt.Errorf("bucket ID mismatch: local %d, sender %d", newBck.Props.BID, bid)
}
return err
}

// WriteSliceAndMeta saves slice and its metafile
func WriteSliceAndMeta(t cluster.Target, hdr *transport.ObjHdr, args *WriteArgs) error {
ct, err := cluster.NewCTFromBO(&hdr.Bck, hdr.ObjName, t.Bowner(), fs.ECSliceType)
func WriteSliceAndMeta(hdr *transport.ObjHdr, args *WriteArgs) error {
ct, err := cluster.NewCTFromBO(&hdr.Bck, hdr.ObjName, g.t.Bowner(), fs.ECSliceType)
if err != nil {
return err
}
Expand All @@ -437,23 +451,23 @@ func WriteSliceAndMeta(t cluster.Target, hdr *transport.ObjHdr, args *WriteArgs)
}
}
tmpFQN := ct.Make(fs.WorkfileType)
if err := ct.Write(t, args.Reader, hdr.ObjAttrs.Size, tmpFQN); err != nil {
if err := ct.Write(g.t, args.Reader, hdr.ObjAttrs.Size, tmpFQN); err != nil {
return err
}
if err := ctMeta.Write(t, bytes.NewReader(args.MD), -1); err != nil {
if err := ctMeta.Write(g.t, bytes.NewReader(args.MD), -1); err != nil {
return err
}
if _, exists := t.Bowner().Get().Get(ctMeta.Bck()); !exists {
if _, exists := g.t.Bowner().Get().Get(ctMeta.Bck()); !exists {
err = fmt.Errorf("slice-and-meta: %s metafile saved while bucket %s was being destroyed",
ctMeta.ObjectName(), ctMeta.Bucket())
return err
}
err = validateBckBID(t, &hdr.Bck, args.BID)
err = validateBckBID(&hdr.Bck, args.BID)
return err
}

// WriteReplicaAndMeta saves replica and its metafile
func WriteReplicaAndMeta(t cluster.Target, lom *cluster.LOM, args *WriteArgs) (err error) {
func WriteReplicaAndMeta(lom *cluster.LOM, args *WriteArgs) (err error) {
lom.Lock(false)
if args.Generation != 0 {
ctMeta := cluster.NewCTFromLOM(lom, fs.ECMetaType)
Expand All @@ -464,7 +478,7 @@ func WriteReplicaAndMeta(t cluster.Target, lom *cluster.LOM, args *WriteArgs) (e
}
lom.Unlock(false)

if err = writeObject(t, lom, args.Reader, lom.SizeBytes(true), args.Xact); err != nil {
if err = writeObject(lom, args.Reader, lom.SizeBytes(true), args.Xact); err != nil {
return
}
if !args.Cksum.IsEmpty() && args.Cksum.Value() != "" { // NOTE: empty value
Expand All @@ -488,14 +502,14 @@ func WriteReplicaAndMeta(t cluster.Target, lom *cluster.LOM, args *WriteArgs) (e
nlog.Errorf("nested error: save replica -> remove metafile: %v", rmErr)
}
}()
if err = ctMeta.Write(t, bytes.NewReader(args.MD), -1); err != nil {
if err = ctMeta.Write(g.t, bytes.NewReader(args.MD), -1); err != nil {
return
}
if _, exists := t.Bowner().Get().Get(ctMeta.Bck()); !exists {
if _, exists := g.t.Bowner().Get().Get(ctMeta.Bck()); !exists {
err = fmt.Errorf("replica-and-meta: %s metafile saved while bucket %s was being destroyed",
ctMeta.ObjectName(), ctMeta.Bucket())
return
}
err = validateBckBID(t, lom.Bucket(), args.BID)
err = validateBckBID(lom.Bucket(), args.BID)
return
}
8 changes: 4 additions & 4 deletions ec/getjogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (c *getJogger) restoreReplicatedFromMemory(ctx *restoreCtx) error {
Generation: ctx.meta.Generation,
Xact: c.parent,
}
if err := WriteReplicaAndMeta(c.parent.t, ctx.lom, args); err != nil {
if err := WriteReplicaAndMeta(ctx.lom, args); err != nil {
writer.Free()
return err
}
Expand Down Expand Up @@ -370,7 +370,7 @@ func (c *getJogger) requestSlices(ctx *restoreCtx) error {
}
} else {
writer = &slice{
writer: mm.NewSGL(cos.KiB * 512),
writer: g.mm.NewSGL(cos.KiB * 512),
twg: wgSlices,
}
}
Expand Down Expand Up @@ -430,7 +430,7 @@ func newSliceWriter(ctx *restoreCtx, writers []io.Writer, restored []*slice,
}
restored[idx] = &slice{workFQN: fqn, n: sliceSize}
} else {
sgl := mm.NewSGL(sliceSize)
sgl := g.mm.NewSGL(sliceSize)
restored[idx] = &slice{obj: sgl, n: sliceSize}
if cksumType != cos.ChecksumNone {
cksums[idx] = cos.NewCksumHash(cksumType)
Expand Down Expand Up @@ -605,7 +605,7 @@ func (c *getJogger) restoreMainObj(ctx *restoreCtx) ([]*slice, error) {
Generation: mainMeta.Generation,
Xact: c.parent,
}
err = WriteReplicaAndMeta(c.parent.t, ctx.lom, args)
err = WriteReplicaAndMeta(ctx.lom, args)
return restored, err
}

Expand Down
Loading

0 comments on commit 7575d93

Please sign in to comment.