From 7404758cbd4cd36199df94117a5e709bba0b1f5c Mon Sep 17 00:00:00 2001 From: Thejas-bhat <35959007+Thejas-bhat@users.noreply.github.com> Date: Thu, 30 Nov 2023 23:45:31 +0530 Subject: [PATCH 1/2] fixing IO stats computation (#188) --- new.go | 8 ++++++ section_faiss_vector_index.go | 18 ++++++++++++ section_inverted_text_index.go | 25 +++++++++++++++-- segment.go | 51 +++++++++++++++++++++++----------- 4 files changed, 84 insertions(+), 18 deletions(-) diff --git a/new.go b/new.go index 57d24c15..74a8e0dd 100644 --- a/new.go +++ b/new.go @@ -215,6 +215,14 @@ func (s *interim) convert() (uint64, []uint64, uint64, error) { } } + // after persisting the sections to the writer, account corresponding + for _, opaque := range s.opaque { + opaqueIO, ok := opaque.(segment.DiskStatsReporter) + if ok { + s.incrementBytesWritten(opaqueIO.BytesWritten()) + } + } + if len(s.results) == 0 { dictOffsets = make([]uint64, len(s.FieldsInv)) } diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index f4334998..e05c16ab 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -510,6 +510,8 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint } } + // accounts for whatever data has been written out to the writer. + vo.incrementBytesWritten(uint64(w.Count() - fieldStart)) vo.fieldAddrs[fieldID] = fieldStart } return 0, nil @@ -613,6 +615,8 @@ type vecInfo struct { type vectorIndexOpaque struct { init bool + bytesWritten uint64 + fieldAddrs map[uint16]int vecIDMap map[int64]vecInfo @@ -621,6 +625,20 @@ type vectorIndexOpaque struct { tmp0 []byte } +func (v *vectorIndexOpaque) incrementBytesWritten(val uint64) { + v.bytesWritten += val +} + +func (v *vectorIndexOpaque) BytesWritten() uint64 { + return v.bytesWritten +} + +func (v *vectorIndexOpaque) BytesRead() uint64 { + return 0 +} + +func (v *vectorIndexOpaque) ResetBytesRead(uint64) {} + func (vo *vectorIndexOpaque) Reset() (err error) { // cleanup stuff over here diff --git a/section_inverted_text_index.go b/section_inverted_text_index.go index 7d952cb6..e1989055 100644 --- a/section_inverted_text_index.go +++ b/section_inverted_text_index.go @@ -379,6 +379,20 @@ func (i *invertedIndexOpaque) grabBuf(size int) []byte { return buf[:size] } +func (i *invertedIndexOpaque) incrementBytesWritten(bytes uint64) { + i.bytesWritten += bytes +} + +func (i *invertedIndexOpaque) BytesWritten() uint64 { + return i.bytesWritten +} + +func (i *invertedIndexOpaque) BytesRead() uint64 { + return 0 +} + +func (i *invertedIndexOpaque) ResetBytesRead(uint64) {} + func (io *invertedIndexOpaque) writeDicts(w *CountHashWriter) (dictOffsets []uint64, err error) { if io.results == nil || len(io.results) == 0 { @@ -494,6 +508,8 @@ func (io *invertedIndexOpaque) writeDicts(w *CountHashWriter) (dictOffsets []uin tfEncoder.Close() locEncoder.Close() + io.incrementBytesWritten(locEncoder.getBytesWritten()) + io.incrementBytesWritten(tfEncoder.getBytesWritten()) postingsOffset, err := writePostings(postingsBS, tfEncoder, locEncoder, nil, w, buf) @@ -529,6 +545,8 @@ func (io *invertedIndexOpaque) writeDicts(w *CountHashWriter) (dictOffsets []uin return nil, err } + io.incrementBytesWritten(uint64(len(vellumData))) + // write this vellum to disk _, err = w.Write(vellumData) if err != nil { @@ -565,6 +583,8 @@ func (io *invertedIndexOpaque) writeDicts(w *CountHashWriter) (dictOffsets []uin return nil, err } + io.incrementBytesWritten(fdvEncoder.getBytesWritten()) + fdvOffsetsStart[fieldID] = uint64(w.Count()) _, err = fdvEncoder.Write() @@ -911,8 +931,9 @@ type invertedIndexOpaque struct { fieldAddrs map[int]int - fieldsSame bool - numDocs uint64 + bytesWritten uint64 + fieldsSame bool + numDocs uint64 } func (io *invertedIndexOpaque) Reset() (err error) { diff --git a/segment.go b/segment.go index 973ae883..97425955 100644 --- a/segment.go +++ b/segment.go @@ -234,7 +234,7 @@ func (s *Segment) loadConfig() error { // 8*4 + 4*3 = 44 bytes being accounted from all the offsets // above being read from the file - s.incrementBytesRead(44) + s.incrementBytesRead(uint64(footerSize)) s.SegmentBase.mem = s.mm[:len(s.mm)-footerSize] return nil } @@ -322,11 +322,13 @@ func (s *SegmentBase) loadFieldsNew() error { // read the number of fields numFields, sz := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64]) pos += uint64(sz) + s.incrementBytesRead(uint64(sz)) var fieldID uint64 for fieldID < numFields { addr := binary.BigEndian.Uint64(s.mem[pos : pos+8]) + s.incrementBytesRead(8) fieldSectionMap := make(map[uint16]uint64) @@ -344,18 +346,19 @@ func (s *SegmentBase) loadFieldsNew() error { return nil } -func (s *SegmentBase) loadFieldNew(fieldID uint16, addr uint64, +func (s *SegmentBase) loadFieldNew(fieldID uint16, pos uint64, fieldSectionMap map[uint16]uint64) error { - pos := addr + + fieldStartPos := pos // to track the number of bytes read fieldNameLen, sz := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64]) pos += uint64(sz) fieldName := string(s.mem[pos : pos+fieldNameLen]) + pos += fieldNameLen + s.fieldsInv = append(s.fieldsInv, fieldName) s.fieldsMap[fieldName] = uint16(fieldID + 1) - pos += fieldNameLen - fieldNumSections, sz := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64]) pos += uint64(sz) @@ -374,17 +377,24 @@ func (s *SegmentBase) loadFieldNew(fieldID uint16, addr uint64, s.dictLocs = append(s.dictLocs, 0) continue } + + read := 0 // skip the doc values _, n := binary.Uvarint(s.mem[fieldSectionAddr : fieldSectionAddr+binary.MaxVarintLen64]) fieldSectionAddr += uint64(n) + read += n _, n = binary.Uvarint(s.mem[fieldSectionAddr : fieldSectionAddr+binary.MaxVarintLen64]) fieldSectionAddr += uint64(n) - dictLoc, _ := binary.Uvarint(s.mem[fieldSectionAddr : fieldSectionAddr+binary.MaxVarintLen64]) - + read += n + dictLoc, n := binary.Uvarint(s.mem[fieldSectionAddr : fieldSectionAddr+binary.MaxVarintLen64]) + // account the bytes read while parsing the field's inverted index section + s.incrementBytesRead(uint64(read + n)) s.dictLocs = append(s.dictLocs, dictLoc) } } + // account the bytes read while parsing the sections field index. + s.incrementBytesRead((pos - uint64(fieldStartPos)) + fieldNameLen) return nil } @@ -709,7 +719,8 @@ func (s *Segment) getSectionDvOffsets(fieldID int, secID uint16) (uint64, uint64 return 0, 0, 0, fmt.Errorf("loadDvReaders: failed to read the docvalue offset end for field %d", fieldID) } read += uint64(n) - // bytes read increment to be done here + + s.incrementBytesRead(read) } return fieldLocStart, fieldLocEnd, 0, nil @@ -744,8 +755,9 @@ func (s *Segment) loadDvReadersLegacy() error { if s.docValueOffset == fieldNotUninverted { return nil } - var read uint64 + for fieldID := range s.fieldsInv { + var read uint64 start, n := binary.Uvarint(s.mem[s.docValueOffset+read : s.docValueOffset+read+binary.MaxVarintLen64]) if n <= 0 { return fmt.Errorf("loadDvReaders: failed to read the docvalue offset start for field %d", fieldID) @@ -756,6 +768,7 @@ func (s *Segment) loadDvReadersLegacy() error { return fmt.Errorf("loadDvReaders: failed to read the docvalue offset end for field %d", fieldID) } read += uint64(n) + s.incrementBytesRead(read) fieldDvReader, err := s.loadFieldDocValueReader(s.fieldsInv[fieldID], start, end) if err != nil { @@ -816,24 +829,30 @@ func (s *SegmentBase) loadDvReaders() error { if secOffset > 0 { // fixed encoding as of now, need to uvarint this pos := secOffset - fieldLocStart, read := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64]) - if read <= 0 { + var read uint64 + fieldLocStart, n := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64]) + if n <= 0 { return fmt.Errorf("loadDvReaders: failed to read the docvalue offset start for field %v", s.fieldsInv[fieldID]) } - pos += uint64(read) - fieldLocEnd, read := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64]) + pos += uint64(n) + read += uint64(n) + fieldLocEnd, n := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64]) if read <= 0 { return fmt.Errorf("loadDvReaders: failed to read the docvalue offset end for field %v", s.fieldsInv[fieldID]) } - pos += uint64(read) + pos += uint64(n) + read += uint64(n) - dataLoc, read := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64]) - if read <= 0 { + s.incrementBytesRead(read) + + dataLoc, n := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64]) + if n <= 0 { return fmt.Errorf("loadDvReaders: failed to read the dataLoc "+ "offset for sectionID %v field %v", secID, s.fieldsInv[fieldID]) } if secID == SectionInvertedTextIndex { s.dictLocs = append(s.dictLocs, dataLoc) + s.incrementBytesRead(uint64(n)) } fieldDvReader, err := s.loadFieldDocValueReader(s.fieldsInv[fieldID], fieldLocStart, fieldLocEnd) if err != nil { From acaf783c70a66f6b56c35943527a6bce674b07a9 Mon Sep 17 00:00:00 2001 From: Thejas-bhat <35959007+Thejas-bhat@users.noreply.github.com> Date: Thu, 30 Nov 2023 23:45:51 +0530 Subject: [PATCH 2/2] bug fix: fixing the logic of tracking vector ids during a merge (#187) * bug fix: correcting the valid vector ids being tracked during merge * fixing the decoding segment data in vector cmd tool * bug fix: shift the docNum by 32 to account for signed score vals --- cmd/zap/cmd/vector.go | 12 +++++++- faiss_vector_posting.go | 7 +++-- faiss_vector_test.go | 5 ++-- section_faiss_vector_index.go | 52 ++++++++++++++++++++++++++++------- 4 files changed, 61 insertions(+), 15 deletions(-) diff --git a/cmd/zap/cmd/vector.go b/cmd/zap/cmd/vector.go index dc72439d..df2bf5ab 100644 --- a/cmd/zap/cmd/vector.go +++ b/cmd/zap/cmd/vector.go @@ -135,9 +135,19 @@ func decodeSection(data []byte, start uint64) (int, int, map[int64][]uint32, *fa numVecs, n := binary.Uvarint(data[pos : pos+binary.MaxVarintLen64]) pos += n for i := 0; i < int(numVecs); i++ { - vecID, n := binary.Uvarint(data[pos : pos+binary.MaxVarintLen64]) + vecID, n := binary.Varint(data[pos : pos+binary.MaxVarintLen64]) pos += n + numDocs, n := binary.Uvarint(data[pos : pos+binary.MaxVarintLen64]) + pos += n + + if numDocs == 1 { + docID, n := binary.Uvarint(data[pos : pos+binary.MaxVarintLen64]) + pos += n + vecDocIDMap[int64(vecID)] = []uint32{uint32(docID)} + continue + } + bitMapLen, n := binary.Uvarint(data[pos : pos+binary.MaxVarintLen64]) pos += n diff --git a/faiss_vector_posting.go b/faiss_vector_posting.go index 133e522f..9582aab9 100644 --- a/faiss_vector_posting.go +++ b/faiss_vector_posting.go @@ -216,7 +216,7 @@ func (i *VecPostingsIterator) nextCodeAtOrAfter(atOrAfter uint64) (uint64, bool, // a transformation function which stores both the score and the docNum as a single // entry which is a uint64 number. func getVectorCode(docNum uint32, score float32) uint64 { - return uint64(docNum)<<31 | uint64(math.Float32bits(score)) + return uint64(docNum)<<32 | uint64(math.Float32bits(score)) } // Next returns the next posting on the vector postings list, or nil at the end @@ -234,7 +234,7 @@ func (i *VecPostingsIterator) nextAtOrAfter(atOrAfter uint64) (segment.VecPostin i.next = VecPosting{} // clear the struct rv := &i.next rv.score = math.Float32frombits(uint32(code & maskLow32Bits)) - rv.docNum = code >> 31 + rv.docNum = code >> 32 return rv, nil } @@ -352,6 +352,9 @@ func (sb *SegmentBase) SimilarVectors(field string, qVector []float32, k int64, // docID and the score to the newly created vecPostingsList for i := 0; i < len(ids); i++ { vecID := ids[i] + if vecID == -1 { + continue // ignore the invalid entries of ids + } docIDs := vecDocIDMap[vecID] var code uint64 diff --git a/faiss_vector_test.go b/faiss_vector_test.go index 52773b6c..e6f6d32e 100644 --- a/faiss_vector_test.go +++ b/faiss_vector_test.go @@ -17,10 +17,11 @@ import ( ) func getStubDocScores(k int) (ids []uint64, scores []float32, err error) { - for i := 1; i <= k; i++ { + for i := 0; i < k; i++ { ids = append(ids, uint64(i)) scores = append(scores, float32((2*i+3)/200)) } + scores[0] = -scores[0] return ids, scores, nil } @@ -37,7 +38,7 @@ func TestVecPostingsIterator(t *testing.T) { docIDs := make(map[uint64]float32) for i, id := range ids { - code := uint64(id)<<31 | uint64(math.Float32bits(scores[i])) + code := uint64(id)<<32 | uint64(math.Float32bits(scores[i])) vecPL.postings.Add(code) docIDs[id] = scores[i] } diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index e05c16ab..c90921d4 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -62,6 +62,7 @@ type vecIndexMeta struct { startOffset int indexSize uint64 vecIds []int64 + deleted []int64 } func remapDocIDs(oldIDs *roaring.Bitmap, newIDs []uint64) *roaring.Bitmap { @@ -153,18 +154,20 @@ LOOP: } // remap the docIDs from the old segment to the new document nos. - // provided. + // provided. furthermore, this function also drops the invalid doc nums + // of that segment in the resulting bitmap bitMap = remapDocIDs(bitMap, newDocNumsIn[segI]) if vecToDocID[vecID] == nil { - // if there are some tombstone entries in the docIDs, marked - // in the drops[ith-segment] bitmap, don't include them in the - // final bitmap. - if drops[segI] != nil && !drops[segI].IsEmpty() { - vecToDocID[vecID] = roaring.AndNot(bitMap, drops[segI]) - } else { + // if the remapped bitmap has valid docs as entries, track it + // as part of vecs to be reconstructed (for larger indexes). + // otherwise, since there are no docs present for this vecID, + // delete it from the specific vector index later on. + if bitMap.GetCardinality() > 0 { vecToDocID[vecID] = bitMap + indexes[len(indexes)-1].vecIds = append(indexes[len(indexes)-1].vecIds, vecID) + } else { + indexes[len(indexes)-1].deleted = append(indexes[len(indexes)-1].deleted, vecID) } - indexes[len(indexes)-1].vecIds = append(indexes[len(indexes)-1].vecIds, vecID) } else { vecToDocID[vecID].Or(bitMap) } @@ -252,6 +255,19 @@ func (v *vectorIndexOpaque) flushVectorSection(vecToDocID map[int64]*roaring.Bit return fieldStart, nil } +func removeDeletedVectors(index *faiss.IndexImpl, ids []int64) error { + sel, err := faiss.NewIDSelectorBatch(ids) + if err != nil { + return err + } + + _, err = index.RemoveIDs(sel) + if err != nil { + return err + } + return nil +} + // todo: naive implementation. need to keep in mind the perf implications and improve on this. // perhaps, parallelized merging can help speed things up over here. func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(fieldID int, sbs []*SegmentBase, @@ -275,7 +291,9 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(fieldID int, sbs []*Segme indexType, isIVF := getIndexType(len(vecToDocID)) if isIVF { // merging of more complex index types (for eg ivf family) with reconstruction - // method. + // method. the indexes[i].vecIds is such that it has only the valid vecs + // of this vector index present in it, so we'd be reconstructed only the + // valid ones. var indexData []float32 for i := 0; i < len(vecIndexes); i++ { if isClosed(closeCh) { @@ -329,12 +347,26 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(fieldID int, sbs []*Segme } } else { // todo: ivf -> flat index when there were huge number of vector deletes for this field + // first flush out the invalid vecs in the first index if any, before the + // merge var err error + if len(indexes[0].deleted) > 0 { + err = removeDeletedVectors(vecIndexes[0], indexes[0].deleted) + if err != nil { + return err + } + } for i := 1; i < len(vecIndexes); i++ { if isClosed(closeCh) { return fmt.Errorf("merging of vector sections aborted") } - err := vecIndexes[0].MergeFrom(vecIndexes[i], 0) + if len(indexes[i].deleted) > 0 { + err = removeDeletedVectors(vecIndexes[i], indexes[i].deleted) + if err != nil { + return err + } + } + err = vecIndexes[0].MergeFrom(vecIndexes[i], 0) if err != nil { return err }