Skip to content

Commit

Permalink
Merge branch 'master' into opaqueReset
Browse files Browse the repository at this point in the history
  • Loading branch information
abhinavdangeti authored Nov 30, 2023
2 parents 34a214f + acaf783 commit 480f41d
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 20 deletions.
8 changes: 8 additions & 0 deletions new.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,14 @@ func (s *interim) convert() (uint64, []uint64, uint64, error) {
}
}

// after persisting the sections to the writer, account corresponding
for _, opaque := range s.opaque {
opaqueIO, ok := opaque.(segment.DiskStatsReporter)
if ok {
s.incrementBytesWritten(opaqueIO.BytesWritten())
}
}

if len(s.results) == 0 {
dictOffsets = make([]uint64, len(s.FieldsInv))
}
Expand Down
21 changes: 20 additions & 1 deletion section_faiss_vector_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,8 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint
}
}

// accounts for whatever data has been written out to the writer.
vo.incrementBytesWritten(uint64(w.Count() - fieldStart))
vo.fieldAddrs[fieldID] = fieldStart
}
return 0, nil
Expand Down Expand Up @@ -639,6 +641,8 @@ type vecInfo struct {
type vectorIndexOpaque struct {
init bool

bytesWritten uint64

lastNumVecs int
lastNumFields int

Expand All @@ -664,6 +668,21 @@ func (v *vectorIndexOpaque) realloc() {
v.fieldAddrs = make(map[uint16]int, v.lastNumFields)
}

func (v *vectorIndexOpaque) incrementBytesWritten(val uint64) {
v.bytesWritten += val
}

func (v *vectorIndexOpaque) BytesWritten() uint64 {
return v.bytesWritten
}

func (v *vectorIndexOpaque) BytesRead() uint64 {
return 0
}

func (v *vectorIndexOpaque) ResetBytesRead(uint64) {
}

// cleanup stuff over here for reusability
func (v *vectorIndexOpaque) Reset() (err error) {
// tracking the number of vecs and fields processed and tracked in this
Expand All @@ -679,6 +698,6 @@ func (v *vectorIndexOpaque) Reset() (err error) {

return nil
}
func (v *vectorIndexOpaque) Set(key string, val interface{}) {

func (v *vectorIndexOpaque) Set(key string, val interface{}) {
}
25 changes: 23 additions & 2 deletions section_inverted_text_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,20 @@ func (i *invertedIndexOpaque) grabBuf(size int) []byte {
return buf[:size]
}

func (i *invertedIndexOpaque) incrementBytesWritten(bytes uint64) {
i.bytesWritten += bytes
}

func (i *invertedIndexOpaque) BytesWritten() uint64 {
return i.bytesWritten
}

func (i *invertedIndexOpaque) BytesRead() uint64 {
return 0
}

func (i *invertedIndexOpaque) ResetBytesRead(uint64) {}

func (io *invertedIndexOpaque) writeDicts(w *CountHashWriter) (dictOffsets []uint64, err error) {

if io.results == nil || len(io.results) == 0 {
Expand Down Expand Up @@ -494,6 +508,8 @@ func (io *invertedIndexOpaque) writeDicts(w *CountHashWriter) (dictOffsets []uin

tfEncoder.Close()
locEncoder.Close()
io.incrementBytesWritten(locEncoder.getBytesWritten())
io.incrementBytesWritten(tfEncoder.getBytesWritten())

postingsOffset, err :=
writePostings(postingsBS, tfEncoder, locEncoder, nil, w, buf)
Expand Down Expand Up @@ -529,6 +545,8 @@ func (io *invertedIndexOpaque) writeDicts(w *CountHashWriter) (dictOffsets []uin
return nil, err
}

io.incrementBytesWritten(uint64(len(vellumData)))

// write this vellum to disk
_, err = w.Write(vellumData)
if err != nil {
Expand Down Expand Up @@ -565,6 +583,8 @@ func (io *invertedIndexOpaque) writeDicts(w *CountHashWriter) (dictOffsets []uin
return nil, err
}

io.incrementBytesWritten(fdvEncoder.getBytesWritten())

fdvOffsetsStart[fieldID] = uint64(w.Count())

_, err = fdvEncoder.Write()
Expand Down Expand Up @@ -909,8 +929,9 @@ type invertedIndexOpaque struct {

fieldAddrs map[int]int

fieldsSame bool
numDocs uint64
bytesWritten uint64
fieldsSame bool
numDocs uint64
}

func (io *invertedIndexOpaque) Reset() (err error) {
Expand Down
54 changes: 37 additions & 17 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (s *Segment) loadConfig() error {

// 8*4 + 4*3 = 44 bytes being accounted from all the offsets
// above being read from the file
s.incrementBytesRead(44)
s.incrementBytesRead(uint64(footerSize))
s.SegmentBase.mem = s.mm[:len(s.mm)-footerSize]
return nil
}
Expand Down Expand Up @@ -322,11 +322,14 @@ func (s *SegmentBase) loadFieldsNew() error {
// read the number of fields
numFields, sz := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64])
pos += uint64(sz)
s.incrementBytesRead(uint64(sz))

var fieldID uint64

for fieldID < numFields {
addr := binary.BigEndian.Uint64(s.mem[pos : pos+8])
s.incrementBytesRead(8)

fieldSectionMap := make(map[uint16]uint64)

err := s.loadFieldNew(uint16(fieldID), addr, fieldSectionMap)
Expand All @@ -342,22 +345,23 @@ func (s *SegmentBase) loadFieldsNew() error {
return nil
}

func (s *SegmentBase) loadFieldNew(fieldID uint16, addr uint64,
func (s *SegmentBase) loadFieldNew(fieldID uint16, pos uint64,
fieldSectionMap map[uint16]uint64) error {
if addr == 0 {
if pos == 0 {
// there is no indexing structure present for this field/section
return nil
}
pos := addr

fieldStartPos := pos // to track the number of bytes read
fieldNameLen, sz := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64])
pos += uint64(sz)

fieldName := string(s.mem[pos : pos+fieldNameLen])
pos += fieldNameLen

s.fieldsInv = append(s.fieldsInv, fieldName)
s.fieldsMap[fieldName] = uint16(fieldID + 1)

pos += fieldNameLen

fieldNumSections, sz := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64])
pos += uint64(sz)

Expand All @@ -376,17 +380,24 @@ func (s *SegmentBase) loadFieldNew(fieldID uint16, addr uint64,
s.dictLocs = append(s.dictLocs, 0)
continue
}

read := 0
// skip the doc values
_, n := binary.Uvarint(s.mem[fieldSectionAddr : fieldSectionAddr+binary.MaxVarintLen64])
fieldSectionAddr += uint64(n)
read += n
_, n = binary.Uvarint(s.mem[fieldSectionAddr : fieldSectionAddr+binary.MaxVarintLen64])
fieldSectionAddr += uint64(n)
dictLoc, _ := binary.Uvarint(s.mem[fieldSectionAddr : fieldSectionAddr+binary.MaxVarintLen64])

read += n
dictLoc, n := binary.Uvarint(s.mem[fieldSectionAddr : fieldSectionAddr+binary.MaxVarintLen64])
// account the bytes read while parsing the field's inverted index section
s.incrementBytesRead(uint64(read + n))
s.dictLocs = append(s.dictLocs, dictLoc)
}
}

// account the bytes read while parsing the sections field index.
s.incrementBytesRead((pos - uint64(fieldStartPos)) + fieldNameLen)
return nil
}

Expand Down Expand Up @@ -711,7 +722,8 @@ func (s *Segment) getSectionDvOffsets(fieldID int, secID uint16) (uint64, uint64
return 0, 0, 0, fmt.Errorf("loadDvReaders: failed to read the docvalue offset end for field %d", fieldID)
}
read += uint64(n)
// bytes read increment to be done here

s.incrementBytesRead(read)
}

return fieldLocStart, fieldLocEnd, 0, nil
Expand Down Expand Up @@ -746,8 +758,9 @@ func (s *Segment) loadDvReadersLegacy() error {
if s.docValueOffset == fieldNotUninverted {
return nil
}
var read uint64

for fieldID := range s.fieldsInv {
var read uint64
start, n := binary.Uvarint(s.mem[s.docValueOffset+read : s.docValueOffset+read+binary.MaxVarintLen64])
if n <= 0 {
return fmt.Errorf("loadDvReaders: failed to read the docvalue offset start for field %d", fieldID)
Expand All @@ -758,6 +771,7 @@ func (s *Segment) loadDvReadersLegacy() error {
return fmt.Errorf("loadDvReaders: failed to read the docvalue offset end for field %d", fieldID)
}
read += uint64(n)
s.incrementBytesRead(read)

fieldDvReader, err := s.loadFieldDocValueReader(s.fieldsInv[fieldID], start, end)
if err != nil {
Expand Down Expand Up @@ -818,24 +832,30 @@ func (s *SegmentBase) loadDvReaders() error {
if secOffset > 0 {
// fixed encoding as of now, need to uvarint this
pos := secOffset
fieldLocStart, read := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64])
if read <= 0 {
var read uint64
fieldLocStart, n := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64])
if n <= 0 {
return fmt.Errorf("loadDvReaders: failed to read the docvalue offset start for field %v", s.fieldsInv[fieldID])
}
pos += uint64(read)
fieldLocEnd, read := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64])
pos += uint64(n)
read += uint64(n)
fieldLocEnd, n := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64])
if read <= 0 {
return fmt.Errorf("loadDvReaders: failed to read the docvalue offset end for field %v", s.fieldsInv[fieldID])
}
pos += uint64(read)
pos += uint64(n)
read += uint64(n)

dataLoc, read := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64])
if read <= 0 {
s.incrementBytesRead(read)

dataLoc, n := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64])
if n <= 0 {
return fmt.Errorf("loadDvReaders: failed to read the dataLoc "+
"offset for sectionID %v field %v", secID, s.fieldsInv[fieldID])
}
if secID == SectionInvertedTextIndex {
s.dictLocs = append(s.dictLocs, dataLoc)
s.incrementBytesRead(uint64(n))
}
fieldDvReader, err := s.loadFieldDocValueReader(s.fieldsInv[fieldID], fieldLocStart, fieldLocEnd)
if err != nil {
Expand Down

0 comments on commit 480f41d

Please sign in to comment.