From 581cc2e538d4c0330d2e0a058686fd48ddf40e22 Mon Sep 17 00:00:00 2001 From: Jayant Date: Thu, 23 May 2024 14:13:08 +0800 Subject: [PATCH] perf: use dirtmake to reduce memclr cost (#1314) --- go.mod | 2 +- go.sum | 3 ++- pkg/generic/thrift/http.go | 3 ++- pkg/generic/thrift/json.go | 3 ++- pkg/mem/span.go | 14 ++++++++++---- pkg/protocol/bthrift/binary.go | 9 +++++---- pkg/remote/default_bytebuf.go | 12 +++++++----- pkg/remote/trans/nphttp2/buffer_test.go | 5 +++++ pkg/remote/trans/nphttp2/client_conn.go | 6 ++++-- pkg/remote/trans/nphttp2/grpc/framer.go | 3 ++- pkg/remote/trans/nphttp2/server_conn.go | 6 ++++-- pkg/utils/fastthrift/fast_thrift.go | 8 ++++++-- 12 files changed, 50 insertions(+), 24 deletions(-) diff --git a/go.mod b/go.mod index 0ff78b779e..090222c564 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.13 require ( github.com/apache/thrift v0.13.0 - github.com/bytedance/gopkg v0.0.0-20230728082804-614d0af6619b + github.com/bytedance/gopkg v0.0.0-20240514070511-01b2cbcf35e1 github.com/bytedance/mockey v1.2.7 github.com/bytedance/sonic v1.11.6 github.com/cloudwego/configmanager v0.2.2 diff --git a/go.sum b/go.sum index c4a8047d3f..ecb07edf56 100644 --- a/go.sum +++ b/go.sum @@ -13,8 +13,9 @@ github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/bytedance/gopkg v0.0.0-20220413063733-65bf48ffb3a7/go.mod h1:2ZlV9BaUH4+NXIBF0aMdKKAnHTzqH+iMU4KUjAbL23Q= -github.com/bytedance/gopkg v0.0.0-20230728082804-614d0af6619b h1:R6PWoQtxEMpWJPHnpci+9LgFxCS7iJCfOGBvCgZeTKI= github.com/bytedance/gopkg v0.0.0-20230728082804-614d0af6619b/go.mod h1:FtQG3YbQG9L/91pbKSw787yBQPutC+457AvDW77fgUQ= +github.com/bytedance/gopkg v0.0.0-20240514070511-01b2cbcf35e1 h1:rT7Mm6uUpHeZQzfs2v0Mlj0SL02CzyVi+EB7VYPM/z4= +github.com/bytedance/gopkg v0.0.0-20240514070511-01b2cbcf35e1/go.mod h1:FtQG3YbQG9L/91pbKSw787yBQPutC+457AvDW77fgUQ= github.com/bytedance/mockey v1.2.7 h1:8j4yCqS5OmMe2dQCxPit4FVkwTK9nrykIgbOZN3s28o= github.com/bytedance/mockey v1.2.7/go.mod h1:bNrUnI1u7+pAc0TYDgPATM+wF2yzHxmNH+iDXg4AOCU= github.com/bytedance/sonic v1.11.5/go.mod h1:X2PC2giUdj/Cv2lliWFLk6c/DUQok5rViJSemeB0wDw= diff --git a/pkg/generic/thrift/http.go b/pkg/generic/thrift/http.go index 61e6cc8a05..53b15109d2 100644 --- a/pkg/generic/thrift/http.go +++ b/pkg/generic/thrift/http.go @@ -21,6 +21,7 @@ import ( "fmt" "github.com/apache/thrift/lib/go/thrift" + "github.com/bytedance/gopkg/lang/dirtmake" "github.com/cloudwego/dynamicgo/conv" "github.com/cloudwego/dynamicgo/conv/t2j" dthrift "github.com/cloudwego/dynamicgo/thrift" @@ -174,7 +175,7 @@ func (r *ReadHTTPResponse) Read(ctx context.Context, method string, in thrift.TP } tyDsc := fnDsc.Response() // json size is usually 2 times larger than equivalent thrift data - buf := make([]byte, 0, len(transBuf)*2) + buf := dirtmake.Bytes(0, len(transBuf)*2) for _, field := range tyDsc.Struct().Fields() { if fid == field.ID() { diff --git a/pkg/generic/thrift/json.go b/pkg/generic/thrift/json.go index 594ef735e2..e769d3f76a 100644 --- a/pkg/generic/thrift/json.go +++ b/pkg/generic/thrift/json.go @@ -22,6 +22,7 @@ import ( "strconv" "github.com/apache/thrift/lib/go/thrift" + "github.com/bytedance/gopkg/lang/dirtmake" "github.com/cloudwego/dynamicgo/conv" "github.com/cloudwego/dynamicgo/conv/t2j" dthrift "github.com/cloudwego/dynamicgo/thrift" @@ -204,7 +205,7 @@ func (m *ReadJSON) Read(ctx context.Context, method string, in thrift.TProtocol) } // json size is usually 2 times larger than equivalent thrift data - buf := make([]byte, 0, len(transBuff)*2) + buf := dirtmake.Bytes(0, len(transBuff)*2) // thrift []byte to json []byte if err := m.t2jBinaryConv.DoInto(ctx, tyDsc, transBuff, &buf); err != nil { return nil, err diff --git a/pkg/mem/span.go b/pkg/mem/span.go index 687e27a2f0..cdc3727ef4 100644 --- a/pkg/mem/span.go +++ b/pkg/mem/span.go @@ -19,6 +19,8 @@ package mem import ( "math/bits" "sync/atomic" + + "github.com/bytedance/gopkg/lang/dirtmake" ) const ( @@ -32,6 +34,8 @@ type spanCache struct { spans [spanCacheSize]*span } +// NewSpanCache returns *spanCache with the given spanSize, +// each span is used to allocate a binary of a specific size level. func NewSpanCache(spanSize int) *spanCache { c := new(spanCache) for i := 0; i < len(c.spans); i++ { @@ -40,10 +44,12 @@ func NewSpanCache(spanSize int) *spanCache { return c } +// Make allocates a binary but does not clear the memory it references. +// NOTE: MUST set any byte element before it's read. func (c *spanCache) Make(n int) []byte { sclass := spanClass(n) - minSpanClass if sclass < 0 || sclass >= len(c.spans) { - return make([]byte, n) + return dirtmake.Bytes(n, n) } return c.spans[sclass].Make(n) } @@ -57,7 +63,7 @@ func (c *spanCache) Copy(buf []byte) (p []byte) { func NewSpan(size int) *span { sp := new(span) sp.size = uint32(size) - sp.buffer = make([]byte, 0, size) + sp.buffer = dirtmake.Bytes(0, size) return sp } @@ -72,7 +78,7 @@ func (b *span) Make(_n int) []byte { n := uint32(_n) if n >= b.size || !atomic.CompareAndSwapUint32(&b.lock, 0, 1) { // fallback path: make a new byte slice if current goroutine cannot get the lock or n is out of size - return make([]byte, n) + return dirtmake.Bytes(int(n), int(n)) } START: b.read += n @@ -83,7 +89,7 @@ START: return buf } // slow path: create a new buffer - b.buffer = make([]byte, b.size) + b.buffer = dirtmake.Bytes(int(b.size), int(b.size)) b.read = 0 goto START } diff --git a/pkg/protocol/bthrift/binary.go b/pkg/protocol/bthrift/binary.go index b1b4024530..2b5dbf3aa8 100644 --- a/pkg/protocol/bthrift/binary.go +++ b/pkg/protocol/bthrift/binary.go @@ -474,17 +474,18 @@ func (binaryProtocol) ReadString(buf []byte) (value string, length int, err erro } func (binaryProtocol) ReadBinary(buf []byte) (value []byte, length int, err error) { - size, l, e := Binary.ReadI32(buf) + _size, l, e := Binary.ReadI32(buf) length += l if e != nil { err = e return } - if size < 0 || int(size) > len(buf) { + size := int(_size) + if size < 0 || size > len(buf) { return value, length, perrors.NewProtocolErrorWithType(thrift.INVALID_DATA, "[ReadBinary] the binary size greater than buf length") } - value = spanCache.Copy(buf[length : length+int(size)]) - length += int(size) + value = spanCache.Copy(buf[length : length+size]) + length += size return } diff --git a/pkg/remote/default_bytebuf.go b/pkg/remote/default_bytebuf.go index 1a8ff17933..fe7b3f3a94 100644 --- a/pkg/remote/default_bytebuf.go +++ b/pkg/remote/default_bytebuf.go @@ -21,6 +21,8 @@ import ( "fmt" "io" "sync" + + "github.com/bytedance/gopkg/lang/dirtmake" ) // Mask bits. @@ -78,7 +80,7 @@ func newWriterByteBuffer(estimatedLength int) ByteBuffer { estimatedLength = 256 // default buffer size } bytebuf := bytebufPool.Get().(*defaultByteBuffer) - bytebuf.buff = make([]byte, estimatedLength) + bytebuf.buff = dirtmake.Bytes(estimatedLength, estimatedLength) bytebuf.status = BitWritable bytebuf.writeIdx = 0 return bytebuf @@ -89,7 +91,7 @@ func newReaderWriterByteBuffer(estimatedLength int) ByteBuffer { estimatedLength = 256 // default buffer size } bytebuf := bytebufPool.Get().(*defaultByteBuffer) - bytebuf.buff = make([]byte, estimatedLength) + bytebuf.buff = dirtmake.Bytes(estimatedLength, estimatedLength) bytebuf.readIdx = 0 bytebuf.writeIdx = 0 bytebuf.status = BitReadable | BitWritable @@ -188,7 +190,7 @@ func (b *defaultByteBuffer) ReadBinary(n int) (p []byte, err error) { if buf, err = b.Next(n); err != nil { return p, err } - p = make([]byte, n) + p = dirtmake.Bytes(n, n) copy(p, buf) return p, nil } @@ -274,7 +276,7 @@ func (b *defaultByteBuffer) Bytes() (buf []byte, err error) { if b.status&BitWritable == 0 { return nil, errors.New("unwritable buffer, cannot support Bytes") } - buf = make([]byte, b.writeIdx) + buf = dirtmake.Bytes(b.writeIdx, b.writeIdx) copy(buf, b.buff[:b.writeIdx]) return buf, nil } @@ -327,7 +329,7 @@ func (b *defaultByteBuffer) ensureWritable(minWritableBytes int) { newCapacity <<= 1 } - buf := make([]byte, newCapacity) + buf := dirtmake.Bytes(newCapacity, newCapacity) copy(buf, b.buff) b.buff = buf } diff --git a/pkg/remote/trans/nphttp2/buffer_test.go b/pkg/remote/trans/nphttp2/buffer_test.go index dba1a175e4..f68ade46b9 100644 --- a/pkg/remote/trans/nphttp2/buffer_test.go +++ b/pkg/remote/trans/nphttp2/buffer_test.go @@ -25,6 +25,11 @@ import ( func TestBuffer(t *testing.T) { // test NewBuffer() grpcConn := newMockNpConn(mockAddr0) + grpcConn.mockConn.ReadFunc = func(b []byte) (n int, err error) { + s := make([]byte, len(b)) + copy(b, s) + return len(b), nil + } buffer := newBuffer(grpcConn) // mock conn only return bytes with 0,0,0..... diff --git a/pkg/remote/trans/nphttp2/client_conn.go b/pkg/remote/trans/nphttp2/client_conn.go index 34929daa2a..e855d30bfa 100644 --- a/pkg/remote/trans/nphttp2/client_conn.go +++ b/pkg/remote/trans/nphttp2/client_conn.go @@ -25,6 +25,8 @@ import ( "strings" "time" + "github.com/bytedance/gopkg/lang/dirtmake" + "github.com/cloudwego/kitex/pkg/remote" "github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/codes" "github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/grpc" @@ -51,13 +53,13 @@ type clientConn struct { var _ GRPCConn = (*clientConn)(nil) func (c *clientConn) ReadFrame() (hdr, data []byte, err error) { - hdr = make([]byte, 5) + hdr = dirtmake.Bytes(5, 5) _, err = c.Read(hdr) if err != nil { return nil, nil, err } dLen := int(binary.BigEndian.Uint32(hdr[1:])) - data = make([]byte, dLen) + data = dirtmake.Bytes(dLen, dLen) _, err = c.Read(data) if err != nil { return nil, nil, err diff --git a/pkg/remote/trans/nphttp2/grpc/framer.go b/pkg/remote/trans/nphttp2/grpc/framer.go index aadf661129..c27a74cefb 100644 --- a/pkg/remote/trans/nphttp2/grpc/framer.go +++ b/pkg/remote/trans/nphttp2/grpc/framer.go @@ -20,6 +20,7 @@ import ( "io" "net" + "github.com/bytedance/gopkg/lang/dirtmake" "github.com/cloudwego/netpoll" "golang.org/x/net/http2/hpack" @@ -64,7 +65,7 @@ type bufWriter struct { func newBufWriter(writer io.Writer, batchSize int) *bufWriter { return &bufWriter{ - buf: make([]byte, batchSize*2), + buf: dirtmake.Bytes(batchSize*2, batchSize*2), batchSize: batchSize, writer: writer, } diff --git a/pkg/remote/trans/nphttp2/server_conn.go b/pkg/remote/trans/nphttp2/server_conn.go index 80011692d2..4fac524be5 100644 --- a/pkg/remote/trans/nphttp2/server_conn.go +++ b/pkg/remote/trans/nphttp2/server_conn.go @@ -22,6 +22,8 @@ import ( "net" "time" + "github.com/bytedance/gopkg/lang/dirtmake" + "github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/codes" "github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/grpc" "github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/status" @@ -43,13 +45,13 @@ func newServerConn(tr grpc.ServerTransport, s *grpc.Stream) *serverConn { } func (c *serverConn) ReadFrame() (hdr, data []byte, err error) { - hdr = make([]byte, 5) + hdr = dirtmake.Bytes(5, 5) _, err = c.Read(hdr) if err != nil { return nil, nil, err } dLen := int(binary.BigEndian.Uint32(hdr[1:])) - data = make([]byte, dLen) + data = dirtmake.Bytes(dLen, dLen) _, err = c.Read(data) if err != nil { return nil, nil, err diff --git a/pkg/utils/fastthrift/fast_thrift.go b/pkg/utils/fastthrift/fast_thrift.go index 213622cf60..c15d40f4dc 100644 --- a/pkg/utils/fastthrift/fast_thrift.go +++ b/pkg/utils/fastthrift/fast_thrift.go @@ -16,11 +16,15 @@ package fastthrift -import "github.com/cloudwego/kitex/pkg/remote/codec/thrift" +import ( + "github.com/bytedance/gopkg/lang/dirtmake" + + "github.com/cloudwego/kitex/pkg/remote/codec/thrift" +) // FastMarshal marshals the msg to buf. The msg should be generated by Kitex tool and implement ThriftMsgFastCodec. func FastMarshal(msg thrift.ThriftMsgFastCodec) []byte { - buf := make([]byte, msg.BLength()) + buf := dirtmake.Bytes(msg.BLength(), msg.BLength()) msg.FastWriteNocopy(buf, nil) return buf }