diff --git a/go.mod b/go.mod index 5062257..57315e8 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.16 require ( github.com/avct/uasurfer v0.0.0-20191028135549-26b5daa857f1 github.com/hack-pad/go-indexeddb v0.1.0 - github.com/hack-pad/hackpadfs v0.1.0 + github.com/hack-pad/hackpadfs v0.1.2 github.com/hack-pad/hush v0.1.0 github.com/johnstarich/go/datasize v0.0.1 github.com/machinebox/progress v0.2.0 diff --git a/go.sum b/go.sum index b7bab68..954e55c 100644 --- a/go.sum +++ b/go.sum @@ -1,17 +1,21 @@ github.com/avct/uasurfer v0.0.0-20191028135549-26b5daa857f1 h1:9h8f71kuF1pqovnn9h7LTHLEjxzyQaj0j1rQq5nsMM4= github.com/avct/uasurfer v0.0.0-20191028135549-26b5daa857f1/go.mod h1:noBAuukeYOXa0aXGqxr24tADqkwDO2KRD15FsuaZ5a8= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw= github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fatih/color v1.12.0 h1:mRhaKNwANqRgUBGKmnI5ZxEk7QXmjQeCcuYFMX2bfcc= github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= +github.com/google/renameio v1.0.1-0.20210406141108-81588dbe0453 h1:vvZyWjAX9oDB+DWpMsZMyv6Q3NZtim2C5Zcdh+H0OmQ= github.com/google/renameio v1.0.1-0.20210406141108-81588dbe0453/go.mod h1:t/HQoYBZSsWSNK35C6CO/TpPLDVWvxOHboWUAweKUpk= github.com/hack-pad/go-indexeddb v0.1.0 h1:UzRAl6WiKxLJePkgi2uaQa9MMPWcjO29zI3pt9D+rNs= github.com/hack-pad/go-indexeddb v0.1.0/go.mod h1:NH8CaojufPNcKYDhy5JkjfyBXE/72oJPeiywlabN/lM= -github.com/hack-pad/hackpadfs v0.1.0 h1:3bItjrgASvPwOU9WZEzcA8pHJRy7FK8Dk6NaQN89SM4= -github.com/hack-pad/hackpadfs v0.1.0/go.mod h1:8bsINHOQhQUioUUiCzCyZZNLfEXjs0RwBIf3lTG+CEg= +github.com/hack-pad/hackpadfs v0.1.1 h1:DhzS50ln5XAOxZ0Xlnb/o3P/+MWUqlcbGdOQ5m+Fg6c= +github.com/hack-pad/hackpadfs v0.1.1/go.mod h1:8bsINHOQhQUioUUiCzCyZZNLfEXjs0RwBIf3lTG+CEg= +github.com/hack-pad/hackpadfs v0.1.2 h1:ZsHfvrNAMNNBVLMKprOiN2rLD37x+YGj3QPJrhUdRF4= +github.com/hack-pad/hackpadfs v0.1.2/go.mod h1:8bsINHOQhQUioUUiCzCyZZNLfEXjs0RwBIf3lTG+CEg= github.com/hack-pad/hush v0.0.0-20210730065049-bd589dbef3a3 h1:0WBvEONkD8zXBRe7+5+mp34L2Upmok0yPKvOqOzpksw= github.com/hack-pad/hush v0.0.0-20210730065049-bd589dbef3a3/go.mod h1:NqjEIfyA2YtlnEPlI/1K3tNuyXGByWFadPxPlGrDPms= github.com/hack-pad/hush v0.1.0 h1:lm/iUaRpVsKkpbN6U9wf45arVnCXzTqsMG1jyihIgkI= @@ -21,6 +25,7 @@ github.com/johnstarich/go/datasize v0.0.1/go.mod h1:4eHLMGz7Q5uCmZeS9rZdahvAih1Q github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1 h1:VkoXIwSboBpnk99O/KFauAEILuNHv5DVFKZMBN/gUgw= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -36,28 +41,35 @@ github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84= github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-runewidth v0.0.6 h1:V2iyH+aX9C5fsYCpK60U8BYIvmhqxuOL3JZcqc1NB7k= github.com/mattn/go-runewidth v0.0.6/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-tty v0.0.3 h1:5OfyWorkyO7xP52Mq7tB36ajHDG5OHrmBGIS/DtakQI= github.com/mattn/go-tty v0.0.3/go.mod h1:ihxohKRERHTVzN+aSVRwACLCeqIoZAWpoICkkvrWyR0= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e h1:aoZm08cpOy4WuID//EZDgcC4zIxODThtZNPirFr42+A= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.7.0 h1:3qqXGV8nn7GJT65debw77Dzrx9sfWYgP0DDo7xcMFRk= github.com/rogpeppe/go-internal v1.7.0/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -68,19 +80,25 @@ golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210503080704-8803ae5d1324 h1:pAwJxDByZctfPwzlNGrDN2BQLsdPb9NkhoTJtUkAO28= golang.org/x/sys v0.0.0-20210503080704-8803ae5d1324/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20210503060354-a79de5458b56 h1:b8jxX3zqjpqb2LklXPzKSGJhzyxCOZSz8ncv8Nv+y7w= golang.org/x/term v0.0.0-20210503060354-a79de5458b56/go.mod h1:tfny5GFUkzUvx4ps4ajbZsCe5lw1metzhBm9T3x7oIY= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/NiS8kaqbCWAEWWAyf8J5U= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/errgo.v2 v2.1.0 h1:0vLT13EuvQ0hNvakwLuFZ/jYrLp5F3kcWHXdRggjCE8= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +mvdan.cc/editorconfig v0.2.0 h1:XL+7ys6ls/RKrkUNFQvEwIvNHh+JKx8Mj1pUV5wQxQE= mvdan.cc/editorconfig v0.2.0/go.mod h1:lvnnD3BNdBYkhq+B4uBuFFKatfp02eB6HixDvEz91C0= mvdan.cc/sh/v3 v3.3.0 h1:ujzElMnry63f4I5sjPFxzo6xia+gwsHZM0yyauuyZ6k= mvdan.cc/sh/v3 v3.3.0/go.mod h1:dh3avhLDhJJ/MJKzbak6FYn+DJKUWk7Fb6Dh5mGdv6Y= diff --git a/internal/bufferpool/pool.go b/internal/bufferpool/pool.go deleted file mode 100644 index 9d6a6e2..0000000 --- a/internal/bufferpool/pool.go +++ /dev/null @@ -1,60 +0,0 @@ -package bufferpool - -import ( - "go.uber.org/atomic" -) - -type Pool struct { - bufferCount atomic.Int64 - bufferSize uint64 - buffers chan *Buffer -} - -type Buffer struct { - Data []byte - pool *Pool -} - -func New(bufferSize, maxBuffers uint64) *Pool { - if maxBuffers == 0 { - maxBuffers = 1 - } - p := &Pool{ - bufferSize: bufferSize, - buffers: make(chan *Buffer, maxBuffers), - } - p.addBuffer() // start with 1 buffer, ready to go - return p -} - -func (p *Pool) addBuffer() { - for { - count := p.bufferCount.Load() - if int(count) == cap(p.buffers) { - return // already at max buffers, no-op - } - if p.bufferCount.CAS(count, count+1) { - break // successfully provisioned slot for new buffer - } - } - buf := &Buffer{ - Data: make([]byte, p.bufferSize), - pool: p, - } - p.buffers <- buf -} - -func (p *Pool) Wait() *Buffer { - select { - case buf := <-p.buffers: - return buf - default: - p.addBuffer() - // may not always get the new buffer, but looping could allocate more buffers far too quickly - return <-p.buffers - } -} - -func (b *Buffer) Done() { - b.pool.buffers <- b -} diff --git a/internal/fs/fs.go b/internal/fs/fs.go index 19bb8fa..2decb1c 100644 --- a/internal/fs/fs.go +++ b/internal/fs/fs.go @@ -1,17 +1,18 @@ package fs import ( + "compress/gzip" "context" "io" "path" "github.com/hack-pad/hackpad/internal/common" "github.com/hack-pad/hackpad/internal/log" - "github.com/hack-pad/hackpad/internal/tarfs" "github.com/hack-pad/hackpadfs" "github.com/hack-pad/hackpadfs/cache" "github.com/hack-pad/hackpadfs/mem" "github.com/hack-pad/hackpadfs/mount" + "github.com/hack-pad/hackpadfs/tar" "github.com/johnstarich/go/datasize" ) @@ -54,14 +55,21 @@ func Overlay(mountPath string, fs hackpadfs.FS) error { type ShouldCacher func(name string, info hackpadfs.FileInfo) bool -func OverlayTarGzip(mountPath string, r io.ReadCloser, persist bool, shouldCache ShouldCacher) error { +func OverlayTarGzip(mountPath string, gzipReader io.ReadCloser, persist bool, shouldCache ShouldCacher) error { + r, err := gzip.NewReader(gzipReader) + if err != nil { + return err + } + mountPath = common.ResolvePath(".", mountPath) if !persist { underlyingFS, err := mem.NewFS() if err != nil { return err } - fs, err := tarfs.New(r, underlyingFS) + fs, err := tar.NewReaderFS(context.Background(), r, tar.ReaderFSOptions{ + UnarchiveFS: underlyingFS, + }) if err != nil { return err } @@ -95,8 +103,8 @@ func OverlayTarGzip(mountPath string, r io.ReadCloser, persist bool, shouldCache _, err = hackpadfs.Stat(underlyingFS, tarfsDoneMarker) if err == nil { // tarfs already completed successfully and is persisted, - // so close tarfs reader and mount the existing files - r.Close() + // so close top-level reader and mount the existing files + gzipReader.Close() cacheFS, err := newCacheFS(underlyingFS) if err != nil { @@ -112,17 +120,22 @@ func OverlayTarGzip(mountPath string, r io.ReadCloser, persist bool, shouldCache } } - tarFS, err := tarfs.New(r, underlyingFS) + readCtx, readCancel := context.WithCancel(context.Background()) + tarFS, err := tar.NewReaderFS(readCtx, r, tar.ReaderFSOptions{ + UnarchiveFS: underlyingFS, + }) if err != nil { + readCancel() return err } - cacheFS, err := newCacheFS(tarFS) + tarClearFS := newClearCtxFS(underlyingFS, readCancel, tarFS.Done()) + cacheFS, err := newCacheFS(tarClearFS) if err != nil { return err } go func() { <-tarFS.Done() - err := tarFS.InitErr() + err := tarFS.UnarchiveErr() if err != nil { log.Errorf("Failed to initialize mount %q: %v", mountPath, err) return @@ -137,6 +150,34 @@ func OverlayTarGzip(mountPath string, r io.ReadCloser, persist bool, shouldCache return filesystem.AddMount(mountPath, cacheFS) } +type clearCtxFS struct { + cancel context.CancelFunc + wait <-chan struct{} + fs clearFS +} + +func newClearCtxFS(fs clearFS, cancel context.CancelFunc, wait <-chan struct{}) *clearCtxFS { + return &clearCtxFS{ + cancel: cancel, + wait: wait, + fs: fs, + } +} + +func (c *clearCtxFS) Open(name string) (hackpadfs.File, error) { + return c.fs.Open(name) +} + +func (c *clearCtxFS) Clear(ctx context.Context) error { + c.cancel() + select { + case <-c.wait: + return c.fs.Clear(ctx) + case <-ctx.Done(): + return ctx.Err() + } +} + // Dump prints out file system statistics func Dump(basePath string) interface{} { var total int64 diff --git a/internal/pubsub/pubsub.go b/internal/pubsub/pubsub.go deleted file mode 100644 index 48a17a0..0000000 --- a/internal/pubsub/pubsub.go +++ /dev/null @@ -1,69 +0,0 @@ -package pubsub - -import ( - "context" - "sync" -) - -type PubSub interface { - // Emit signals all Waiters waiting on 'key' will unblock. - // Can not be called concurrently. - Emit(key string) - // Wait waits for the 'key' to be emitted or for Close() to be called - Wait(key string) -} - -type pubsub struct { - mu sync.RWMutex - subscribers map[string][]context.CancelFunc - visited map[string]bool - ctx context.Context -} - -// New creates a new PubSub that unblocks all calls to Wait when ctx is canceled -func New(ctx context.Context) PubSub { - return &pubsub{ - ctx: ctx, - subscribers: make(map[string][]context.CancelFunc), - visited: make(map[string]bool), - } -} - -func (ps *pubsub) Emit(key string) { - ps.mu.RLock() - visited := ps.visited[key] - ps.mu.RUnlock() - if visited { - return - } - ps.mu.Lock() - ps.visited[key] = true - funcs := ps.subscribers[key] - ps.subscribers[key] = nil - ps.mu.Unlock() - for _, cancel := range funcs { - cancel() - } -} - -func (ps *pubsub) Wait(key string) { - select { - case <-ps.ctx.Done(): - return - default: - } - - ps.mu.Lock() - if ps.visited[key] { - ps.mu.Unlock() - return - } - ctx, cancel := context.WithCancel(ps.ctx) - ps.subscribers[key] = append(ps.subscribers[key], cancel) - ps.mu.Unlock() - - select { - case <-ps.ctx.Done(): - case <-ctx.Done(): - } -} diff --git a/internal/pubsub/pubsub_test.go b/internal/pubsub/pubsub_test.go deleted file mode 100644 index 51c2c8a..0000000 --- a/internal/pubsub/pubsub_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package pubsub - -import ( - "context" - "testing" -) - -func TestPubSub(t *testing.T) { - t.Run("emit first", func(t *testing.T) { - ps := New(context.Background()) - ps.Emit("hi") - ps.Wait("hi") - }) - - t.Run("wait first", func(t *testing.T) { - ps := New(context.Background()) - wait := make(chan struct{}) - completed := make(chan struct{}) - go func() { - close(wait) - ps.Wait("hi") - close(completed) - }() - <-wait - for _, key := range []string{"a", "b", "c", "hi"} { - ps.Emit(key) - } - <-completed - }) -} diff --git a/internal/tarfs/fs.go b/internal/tarfs/fs.go deleted file mode 100644 index 2ad9f82..0000000 --- a/internal/tarfs/fs.go +++ /dev/null @@ -1,284 +0,0 @@ -package tarfs - -import ( - "archive/tar" - "compress/gzip" - "context" - "io" - "path" - "runtime" - "sync" - - "github.com/hack-pad/hackpad/internal/bufferpool" - "github.com/hack-pad/hackpad/internal/common" - "github.com/hack-pad/hackpad/internal/log" - "github.com/hack-pad/hackpad/internal/pubsub" - "github.com/hack-pad/hackpadfs" - "github.com/pkg/errors" -) - -type FS struct { - underlyingFS BaseFS - ps pubsub.PubSub - ctx context.Context - cancel context.CancelFunc - initErr error -} - -var _ hackpadfs.FS = &FS{} - -type BaseFS interface { - hackpadfs.OpenFileFS - hackpadfs.ChmodFS - hackpadfs.MkdirFS -} - -func New(r io.Reader, underlyingFS BaseFS) (_ *FS, retErr error) { - defer func() { retErr = errors.Wrap(retErr, "tarfs") }() - - if dirEntries, err := hackpadfs.ReadDir(underlyingFS, "."); err != nil || len(dirEntries) != 0 { - var names []string - for _, dirEntry := range dirEntries { - names = append(names, dirEntry.Name()) - } - return nil, errors.Errorf("Root '/' must be an empty directory, got: %T %v %s", underlyingFS, err, names) - } - - ctx, cancel := context.WithCancel(context.Background()) - fs := &FS{ - underlyingFS: underlyingFS, - ps: pubsub.New(ctx), - ctx: ctx, - cancel: cancel, - } - go fs.downloadGzip(r) - return fs, nil -} - -func (fs *FS) downloadGzip(r io.Reader) { - err := fs.downloadGzipErr(r) - if err != nil { - fs.initErr = err - log.Error("tarfs: Failed to complete overlay: ", err) - } - fs.cancel() - - if closer, ok := r.(io.Closer); ok { - _ = closer.Close() - } -} - -func (fs *FS) downloadGzipErr(r io.Reader) error { - compressor, err := gzip.NewReader(r) - if err != nil { - return errors.Wrap(err, "gzip reader") - } - defer compressor.Close() - - archive := tar.NewReader(compressor) - const ( - mebibyte = 1 << 20 - kibibyte = 1 << 10 - maxMemory = 20 * mebibyte - bigBufMemory = 4 * mebibyte - smallBufMemory = 150 * kibibyte - - // at least a couple big and small buffers, then a large quantity of small ones make up the remainder - bigBufCount = 2 - smallBufCount = (maxMemory - bigBufCount*bigBufMemory) / smallBufMemory - ) - smallPool := bufferpool.New(smallBufMemory, smallBufCount) - bigPool := bufferpool.New(bigBufMemory, bigBufCount) - defer runtime.GC() // forcefully clean up memory pools - - mkdirCache := make(map[string]bool) - cachedMkdirAll := func(path string, perm hackpadfs.FileMode) error { - if _, ok := mkdirCache[path]; ok { - return nil - } - err := hackpadfs.MkdirAll(fs.underlyingFS, path, perm) - if err == nil { - mkdirCache[path] = true - } - return err - } - - var wg sync.WaitGroup - errs := make(chan error, 1) - for { - select { - case err := <-errs: - return err - default: - } - header, err := archive.Next() - if err == io.EOF { - break - } - if err != nil { - return errors.Wrap(err, "next tar file") - } - err = fs.initProcessFile(header, archive, &wg, errs, cachedMkdirAll, bigPool, smallPool) - if err != nil { - return err - } - } - - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - select { - case err := <-errs: - return err - case <-done: - return nil - } -} - -func (fs *FS) initProcessFile( - header *tar.Header, r io.Reader, - wg *sync.WaitGroup, errs chan error, - mkdirAll func(string, hackpadfs.FileMode) error, - bigPool, smallPool *bufferpool.Pool, -) error { - select { - case <-fs.ctx.Done(): - return fs.ctx.Err() - default: - } - - originalName := header.Name - p := common.ResolvePath(".", originalName) - info := header.FileInfo() - - dir := path.Dir(p) - err := mkdirAll(dir, 0700) - if err != nil { - return errors.Wrap(err, "prepping base dir") - } - - if info.IsDir() { - // assume dir does not exist yet, then chmod if it does exist - wg.Add(1) - go func() { - defer wg.Done() - err := fs.underlyingFS.Mkdir(p, info.Mode()) - if err != nil { - if !errors.Is(err, hackpadfs.ErrExist) { - errs <- errors.Wrap(err, "copying dir") - return - } - err = fs.underlyingFS.Chmod(p, info.Mode()) - if err != nil { - errs <- errors.Wrap(err, "copying dir") - return - } - } - }() - return nil - } - - reader := fullReader{r} // fullReader: call f.Write as few times as possible, since large files are expensive - // read once. if we reached EOF, then write it to fs asynchronously - smallBuf := smallPool.Wait() - n, err := reader.Read(smallBuf.Data) - switch err { - case io.EOF: - wg.Add(1) - go func() { - err := fs.writeFile(p, info, smallBuf, n, nil, nil) - smallBuf.Done() - if err != nil { - errs <- err - } - wg.Done() - }() - return nil - case nil: - bigBuf := bigPool.Wait() - err := fs.writeFile(p, info, smallBuf, n, reader, bigBuf) - bigBuf.Done() - smallBuf.Done() - return err - default: - return err - } -} - -func (fs *FS) writeFile(path string, info hackpadfs.FileInfo, initialBuf *bufferpool.Buffer, n int, r io.Reader, copyBuf *bufferpool.Buffer) (returnedErr error) { - f, err := fs.underlyingFS.OpenFile(path, hackpadfs.FlagWriteOnly|hackpadfs.FlagCreate|hackpadfs.FlagTruncate, info.Mode()) - if err != nil { - return errors.Wrap(err, "opening destination file") - } - defer func() { - f.Close() - if returnedErr == nil { - fs.ps.Emit(path) // only emit for non-dirs, dirs will wait until the download completes to ensure correctness - } - }() - - fWriter, ok := f.(io.Writer) - if !ok { - return hackpadfs.ErrNotImplemented - } - - _, err = fWriter.Write(initialBuf.Data[:n]) - if err != nil { - return errors.Wrap(err, "write: copying file") - } - - if r == nil { - // a nil reader signals we already did a read of N bytes and hit EOF, - // so the above copy is sufficient, return now - return nil - } - - _, err = io.CopyBuffer(fWriter, r, copyBuf.Data) - return errors.Wrap(err, "copybuf: copying file") -} - -type fullReader struct { - io.Reader -} - -func (f fullReader) Read(p []byte) (n int, err error) { - n, err = io.ReadFull(f.Reader, p) - if err == io.ErrUnexpectedEOF { - err = io.EOF - } - return -} - -func (fs *FS) Open(name string) (hackpadfs.File, error) { - if !hackpadfs.ValidPath(name) { - return nil, &hackpadfs.PathError{Op: "open", Path: name, Err: hackpadfs.ErrInvalid} - } - fs.ps.Wait(name) - if fs.initErr != nil { - return nil, &hackpadfs.PathError{Op: "open", Path: name, Err: fs.initErr} - } - return fs.underlyingFS.Open(name) -} - -func (fs *FS) Done() <-chan struct{} { - return fs.ctx.Done() -} - -func (fs *FS) InitErr() error { - return fs.initErr -} - -type clearFS interface { - Clear(ctx context.Context) error -} - -func (fs *FS) Clear(ctx context.Context) (err error) { - if clearFS, ok := fs.underlyingFS.(clearFS); ok { - fs.initErr = context.Canceled - fs.cancel() - return clearFS.Clear(ctx) - } - return errors.New("Unsupported operation for base FS") -} diff --git a/internal/tarfs/fs_test.go b/internal/tarfs/fs_test.go deleted file mode 100644 index d791e9f..0000000 --- a/internal/tarfs/fs_test.go +++ /dev/null @@ -1,146 +0,0 @@ -package tarfs - -import ( - "archive/tar" - "bytes" - "compress/gzip" - "io" - "runtime" - "testing" - "time" - - "github.com/hack-pad/hackpadfs" - "github.com/hack-pad/hackpadfs/fstest" - "github.com/hack-pad/hackpadfs/mem" - "github.com/pkg/errors" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestFS(t *testing.T) { - options := fstest.FSOptions{ - Name: "tar", - Setup: fstest.TestSetupFunc(func(tb testing.TB) (fstest.SetupFS, func() hackpadfs.FS) { - setupFS, err := mem.NewFS() - require.NoError(tb, err) - - return setupFS, func() hackpadfs.FS { - return newTarFromFS(tb, setupFS) - } - }), - } - fstest.FS(t, options) - fstest.File(t, options) -} - -func newTarFromFS(tb testing.TB, src hackpadfs.FS) *FS { - r, err := buildTarFromFS(src) - require.NoError(tb, err) - memFS, err := mem.NewFS() - require.NoError(tb, err) - - fs, err := New(r, memFS) - require.NoError(tb, err) - return fs -} - -func buildTarFromFS(src hackpadfs.FS) (io.Reader, error) { - var buf bytes.Buffer - compressor := gzip.NewWriter(&buf) - defer compressor.Close() - - archive := tar.NewWriter(compressor) - defer archive.Close() - - err := hackpadfs.WalkDir(src, ".", copyTarWalk(src, archive)) - return &buf, errors.Wrap(err, "Failed building tar from FS walk") -} - -func copyTarWalk(src hackpadfs.FS, archive *tar.Writer) hackpadfs.WalkDirFunc { - return func(path string, dir hackpadfs.DirEntry, err error) error { - if err != nil { - return err - } - info, err := dir.Info() - if err != nil { - return err - } - header, err := tar.FileInfoHeader(info, "") - if err != nil { - return err - } - header.Name = path - if info.IsDir() { - header.Name += "/" - } - err = archive.WriteHeader(header) - if err != nil { - return err - } - fileBytes, err := hackpadfs.ReadFile(src, path) - if err != nil { - return err - } - _, err = archive.Write(fileBytes) - return err - } -} - -func TestNewFromFs(t *testing.T) { - for _, tc := range []struct { - description string - do func(t *testing.T, fs hackpadfs.FS) - }{ - { - description: "empty", - do: func(t *testing.T, fs hackpadfs.FS) {}, - }, - { - description: "one file", - do: func(t *testing.T, fs hackpadfs.FS) { - _, err := hackpadfs.Create(fs, "foo") - require.NoError(t, err) - }, - }, - { - description: "one dir", - do: func(t *testing.T, fs hackpadfs.FS) { - err := hackpadfs.Mkdir(fs, "foo", 0700) - require.NoError(t, err) - }, - }, - { - description: "dir with one nested file", - do: func(t *testing.T, fs hackpadfs.FS) { - err := hackpadfs.Mkdir(fs, "foo", 0700) - require.NoError(t, err) - _, err = hackpadfs.Create(fs, "foo/bar") - require.NoError(t, err) - }, - }, - } { - t.Run(tc.description, func(t *testing.T) { - memFS, err := mem.NewFS() - require.NoError(t, err) - tc.do(t, memFS) - timer := time.NewTimer(50 * time.Millisecond) - done := make(chan struct{}) - - go func() { - tarFS := newTarFromFS(t, memFS) - assert.NoError(t, err) - assert.NotNil(t, tarFS) - close(done) - }() - - select { - case <-done: - timer.Stop() - case <-timer.C: - buf := make([]byte, 4096) - n := runtime.Stack(buf, true) - t.Fatalf("Took too long:\n%s", string(buf[:n])) - } - }) - } -}