Skip to content

Commit

Permalink
  • Loading branch information
OleksiienkoMykyta committed Jan 8, 2025
1 parent 37030fb commit e14dad0
Show file tree
Hide file tree
Showing 13 changed files with 1,013 additions and 981 deletions.
8 changes: 4 additions & 4 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1276,7 +1276,7 @@ func (c *Conn) prepareStatement(ctx context.Context, stmt string, tracer Tracer)
flight.preparedStatment = &preparedStatment{
// defensively copy as we will recycle the underlying buffer after we
// return.
id: copyBytes(x.preparedID),
id: internal.CopyBytes(x.preparedID),
// the type info's should _not_ have a reference to the framers read buffer,
// therefore we can just copy them directly.
request: x.reqMeta,
Expand Down Expand Up @@ -1308,7 +1308,7 @@ func marshalQueryValue(typ TypeInfo, value interface{}, dst *queryValues) error
value = named.value
}

if _, ok := value.(unsetColumn); !ok {
if _, ok := value.(internal.UnsetColumn); !ok {
val, err := Marshal(typ, value)
if err != nil {
return err
Expand Down Expand Up @@ -1431,7 +1431,7 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter {
if params.skipMeta {
if info != nil {
iter.meta = info.response
iter.meta.pagingState = copyBytes(x.meta.pagingState)
iter.meta.pagingState = internal.CopyBytes(x.meta.pagingState)
} else {
return &Iter{framer: framer, err: errors.New("gocql: did not receive metadata but prepared info is nil")}
}
Expand All @@ -1442,7 +1442,7 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter {
if x.meta.morePages() && !qry.disableAutoPage {
newQry := new(Query)
*newQry = *qry
newQry.pageState = copyBytes(x.meta.pagingState)
newQry.pageState = internal.CopyBytes(x.meta.pagingState)
newQry.metrics = &queryMetrics{m: make(map[string]*hostMetrics)}

iter.next = &nextIter{
Expand Down
2 changes: 1 addition & 1 deletion control.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func init() {
panic(fmt.Sprintf("unable to seed random number generator: %v", err))
}

randr = rand.New(rand.NewSource(int64(readInt(b))))
randr = rand.New(rand.NewSource(int64(internal.ReadInt(b))))
}

const (
Expand Down
70 changes: 11 additions & 59 deletions frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"
"errors"
"fmt"
"github.com/gocql/gocql/internal"
"io"
"io/ioutil"
"net"
Expand All @@ -36,16 +37,14 @@ import (
"time"
)

type unsetColumn struct{}

// UnsetValue represents a value used in a query binding that will be ignored by Cassandra.
//
// By setting a field to the unset value Cassandra will ignore the write completely.
// The main advantage is the ability to keep the same prepared statement even when you don't
// want to update some fields, where before you needed to make another prepared statement.
//
// UnsetValue is only available when using the version 4 of the protocol.
var UnsetValue = unsetColumn{}
var UnsetValue = internal.UnsetColumn{}

type namedValue struct {
name string
Expand Down Expand Up @@ -331,10 +330,6 @@ var (

const maxFrameHeaderSize = 9

func readInt(p []byte) int32 {
return int32(p[0])<<24 | int32(p[1])<<16 | int32(p[2])<<8 | int32(p[3])
}

type frameHeader struct {
version protoVersion
flags byte
Expand Down Expand Up @@ -474,15 +469,15 @@ func readHeader(r io.Reader, p []byte) (head frameHeader, err error) {

head.stream = int(int16(p[2])<<8 | int16(p[3]))
head.op = frameOp(p[4])
head.length = int(readInt(p[5:]))
head.length = int(internal.ReadInt(p[5:]))
} else {
if len(p) != 8 {
return frameHeader{}, fmt.Errorf("not enough bytes to read header require 8 got: %d", len(p))
}

head.stream = int(int8(p[2]))
head.op = frameOp(p[3])
head.length = int(readInt(p[4:]))
head.length = int(internal.ReadInt(p[4:]))
}

return head, nil
Expand Down Expand Up @@ -647,7 +642,7 @@ func (f *framer) parseErrorFrame() frame {
stmtId := f.readShortBytes()
return &RequestErrUnprepared{
errorFrame: errD,
StatementId: copyBytes(stmtId), // defensively copy
StatementId: internal.CopyBytes(stmtId), // defensively copy
}
case ErrCodeReadFailure:
res := &RequestErrReadFailure{
Expand Down Expand Up @@ -969,7 +964,7 @@ func (f *framer) parsePreparedMetadata() preparedMetadata {
}

if meta.flags&flagHasMorePages == flagHasMorePages {
meta.pagingState = copyBytes(f.readBytes())
meta.pagingState = internal.CopyBytes(f.readBytes())
}

if meta.flags&flagNoMetaData == flagNoMetaData {
Expand Down Expand Up @@ -1057,7 +1052,7 @@ func (f *framer) parseResultMetadata() resultMetadata {
meta.actualColCount = meta.colCount

if meta.flags&flagHasMorePages == flagHasMorePages {
meta.pagingState = copyBytes(f.readBytes())
meta.pagingState = internal.CopyBytes(f.readBytes())
}

if meta.flags&flagNoMetaData == flagNoMetaData {
Expand Down Expand Up @@ -1940,49 +1935,6 @@ func (f *framer) writeByte(b byte) {
f.buf = append(f.buf, b)
}

func appendBytes(p []byte, d []byte) []byte {
if d == nil {
return appendInt(p, -1)
}
p = appendInt(p, int32(len(d)))
p = append(p, d...)
return p
}

func appendShort(p []byte, n uint16) []byte {
return append(p,
byte(n>>8),
byte(n),
)
}

func appendInt(p []byte, n int32) []byte {
return append(p, byte(n>>24),
byte(n>>16),
byte(n>>8),
byte(n))
}

func appendUint(p []byte, n uint32) []byte {
return append(p, byte(n>>24),
byte(n>>16),
byte(n>>8),
byte(n))
}

func appendLong(p []byte, n int64) []byte {
return append(p,
byte(n>>56),
byte(n>>48),
byte(n>>40),
byte(n>>32),
byte(n>>24),
byte(n>>16),
byte(n>>8),
byte(n),
)
}

func (f *framer) writeCustomPayload(customPayload *map[string][]byte) {
if len(*customPayload) > 0 {
if f.proto < protoVersion4 {
Expand All @@ -1994,19 +1946,19 @@ func (f *framer) writeCustomPayload(customPayload *map[string][]byte) {

// these are protocol level binary types
func (f *framer) writeInt(n int32) {
f.buf = appendInt(f.buf, n)
f.buf = internal.AppendInt(f.buf, n)
}

func (f *framer) writeUint(n uint32) {
f.buf = appendUint(f.buf, n)
f.buf = internal.AppendUint(f.buf, n)
}

func (f *framer) writeShort(n uint16) {
f.buf = appendShort(f.buf, n)
f.buf = internal.AppendShort(f.buf, n)
}

func (f *framer) writeLong(n int64) {
f.buf = appendLong(f.buf, n)
f.buf = internal.AppendLong(f.buf, n)
}

func (f *framer) writeString(s string) {
Expand Down
39 changes: 3 additions & 36 deletions helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package gocql

import (
"fmt"
"github.com/gocql/gocql/internal"
"math/big"
"net"
"reflect"
Expand Down Expand Up @@ -176,7 +177,7 @@ func getCassandraType(name string, logger StdLogger) TypeInfo {
Elem: getCassandraType(strings.TrimPrefix(name[:len(name)-1], "list<"), logger),
}
} else if strings.HasPrefix(name, "map<") {
names := splitCompositeTypes(strings.TrimPrefix(name[:len(name)-1], "map<"))
names := internal.SplitCompositeTypes(strings.TrimPrefix(name[:len(name)-1], "map<"))
if len(names) != 2 {
logger.Printf("Error parsing map type, it has %d subelements, expecting 2\n", len(names))
return NativeType{
Expand All @@ -189,7 +190,7 @@ func getCassandraType(name string, logger StdLogger) TypeInfo {
Elem: getCassandraType(names[1], logger),
}
} else if strings.HasPrefix(name, "tuple<") {
names := splitCompositeTypes(strings.TrimPrefix(name[:len(name)-1], "tuple<"))
names := internal.SplitCompositeTypes(strings.TrimPrefix(name[:len(name)-1], "tuple<"))
types := make([]TypeInfo, len(names))

for i, name := range names {
Expand All @@ -207,34 +208,6 @@ func getCassandraType(name string, logger StdLogger) TypeInfo {
}
}

func splitCompositeTypes(name string) []string {
if !strings.Contains(name, "<") {
return strings.Split(name, ", ")
}
var parts []string
lessCount := 0
segment := ""
for _, char := range name {
if char == ',' && lessCount == 0 {
if segment != "" {
parts = append(parts, strings.TrimSpace(segment))
}
segment = ""
continue
}
segment += string(char)
if char == '<' {
lessCount++
} else if char == '>' {
lessCount--
}
}
if segment != "" {
parts = append(parts, strings.TrimSpace(segment))
}
return parts
}

func apacheToCassandraType(t string) string {
t = strings.Replace(t, apacheCassandraTypePrefix, "", -1)
t = strings.Replace(t, "(", "<", -1)
Expand Down Expand Up @@ -451,12 +424,6 @@ func (iter *Iter) MapScan(m map[string]interface{}) bool {
return false
}

func copyBytes(p []byte) []byte {
b := make([]byte, len(p))
copy(b, p)
return b
}

var failDNS = false

func LookupIP(host string) ([]net.IP, error) {
Expand Down
50 changes: 50 additions & 0 deletions internal/frame.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package internal

type UnsetColumn struct{}

func ReadInt(p []byte) int32 {
return int32(p[0])<<24 | int32(p[1])<<16 | int32(p[2])<<8 | int32(p[3])
}

func AppendBytes(p []byte, d []byte) []byte {
if d == nil {
return AppendInt(p, -1)
}
p = AppendInt(p, int32(len(d)))
p = append(p, d...)
return p
}

func AppendShort(p []byte, n uint16) []byte {
return append(p,
byte(n>>8),
byte(n),
)
}

func AppendInt(p []byte, n int32) []byte {
return append(p, byte(n>>24),
byte(n>>16),
byte(n>>8),
byte(n))
}

func AppendUint(p []byte, n uint32) []byte {
return append(p, byte(n>>24),
byte(n>>16),
byte(n>>8),
byte(n))
}

func AppendLong(p []byte, n int64) []byte {
return append(p,
byte(n>>56),
byte(n>>48),
byte(n>>40),
byte(n>>32),
byte(n>>24),
byte(n>>16),
byte(n>>8),
byte(n),
)
}
37 changes: 37 additions & 0 deletions internal/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package internal

import "strings"

func SplitCompositeTypes(name string) []string {
if !strings.Contains(name, "<") {
return strings.Split(name, ", ")
}
var parts []string
lessCount := 0
segment := ""
for _, char := range name {
if char == ',' && lessCount == 0 {
if segment != "" {
parts = append(parts, strings.TrimSpace(segment))
}
segment = ""
continue
}
segment += string(char)
if char == '<' {
lessCount++
} else if char == '>' {
lessCount--
}
}
if segment != "" {
parts = append(parts, strings.TrimSpace(segment))
}
return parts
}

func CopyBytes(p []byte) []byte {
b := make([]byte, len(p))
copy(b, p)
return b
}
Loading

0 comments on commit e14dad0

Please sign in to comment.