Skip to content

Commit

Permalink
Bug fixes and Optimizations (#267)
Browse files Browse the repository at this point in the history
- Fix issue where `vecIDsToExclude` is always nil if the cache entry
exists, allowing deleted documents to be returned.
- Optimize vector cache logic by checking for a cache entry after
acquiring a write lock to reduce redundant work during concurrent
access.
- Resolve issue of duplicate entries being added to `dictLocs` when
constructing an in-memory segment.
- Clean up code by removing the unused method parameters that were being
passed around.
  • Loading branch information
CascadingRadium authored Oct 9, 2024
1 parent 1c5b688 commit 75f3dcf
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 49 deletions.
12 changes: 6 additions & 6 deletions build.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,24 +157,24 @@ func persistStoredFieldValues(fieldID int,
return curr, data, nil
}

func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32,
fieldsMap map[string]uint16, fieldsInv []string, numDocs uint64,
storedIndexOffset uint64, dictLocs []uint64,
sectionsIndexOffset uint64) (*SegmentBase, error) {
func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64,
storedIndexOffset uint64, sectionsIndexOffset uint64) (*SegmentBase, error) {
sb := &SegmentBase{
mem: mem,
memCRC: memCRC,
chunkMode: chunkMode,
fieldsMap: fieldsMap,
numDocs: numDocs,
storedIndexOffset: storedIndexOffset,
fieldsIndexOffset: sectionsIndexOffset,
sectionsIndexOffset: sectionsIndexOffset,
fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)),
docValueOffset: 0, // docValueOffsets identified automatically by the section
dictLocs: dictLocs,
fieldFSTs: make(map[uint16]*vellum.FST),
vecIndexCache: newVectorIndexCache(),
// following fields gets populated by loadFieldsNew
fieldsMap: make(map[string]uint16),
dictLocs: make([]uint64, 0),
fieldsInv: make([]string, 0),
}
sb.updateSize()

Expand Down
15 changes: 14 additions & 1 deletion faiss_vector_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func (vc *vectorIndexCache) loadFromCache(fieldID uint16, loadDocVecIDMap bool,

entry, ok := vc.cache[fieldID]
if ok {
vecIDsToExclude = getVecIDsToExclude(vecDocIDMap, except)
index, vecDocIDMap, docVecIDMap = entry.load()
vecIDsToExclude = getVecIDsToExclude(vecDocIDMap, except)
if !loadDocVecIDMap || (loadDocVecIDMap && len(entry.docVecIDMap) > 0) {
vc.m.RUnlock()
return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
Expand Down Expand Up @@ -120,6 +120,19 @@ func (vc *vectorIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte,
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32,
docVecIDMap map[uint32][]int64, vecIDsToExclude []int64, err error) {

// Handle concurrent accesses (to avoid unnecessary work) by adding a
// check within the write lock here.
entry := vc.cache[fieldID]
if entry != nil {
index, vecDocIDMap, docVecIDMap = entry.load()
vecIDsToExclude = getVecIDsToExclude(vecDocIDMap, except)
if !loadDocVecIDMap || (loadDocVecIDMap && len(entry.docVecIDMap) > 0) {
return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
}
docVecIDMap = vc.addDocVecIDMapToCacheLOCKED(entry)
return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
}

// if the cache doesn't have the entry, construct the vector to doc id map and
// the vector index out of the mem bytes and update the cache under lock.
pos := 0
Expand Down
22 changes: 10 additions & 12 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, pat
// wrap it for counting (tracking offsets)
cr := NewCountHashWriterWithStatsReporter(br, s)

newDocNums, numDocs, storedIndexOffset, _, _, _, sectionsIndexOffset, err :=
MergeToWriter(segmentBases, drops, chunkMode, cr, closeCh)
newDocNums, numDocs, storedIndexOffset, _, _, sectionsIndexOffset, err :=
mergeToWriter(segmentBases, drops, chunkMode, cr, closeCh)
if err != nil {
cleanup()
return nil, 0, err
Expand Down Expand Up @@ -109,9 +109,9 @@ func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, pat
return newDocNums, uint64(cr.Count()), nil
}

func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
chunkMode uint32, cr *CountHashWriter, closeCh chan struct{}) (
newDocNums [][]uint64, numDocs, storedIndexOffset uint64, dictLocs []uint64,
newDocNums [][]uint64, numDocs, storedIndexOffset uint64,
fieldsInv []string, fieldsMap map[string]uint16, sectionsIndexOffset uint64,
err error) {

Expand All @@ -122,7 +122,7 @@ func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
numDocs = computeNewDocCount(segments, drops)

if isClosed(closeCh) {
return nil, 0, 0, nil, nil, nil, 0, seg.ErrClosed
return nil, 0, 0, nil, nil, 0, seg.ErrClosed
}

// the merge opaque is especially important when it comes to tracking the file
Expand All @@ -140,7 +140,7 @@ func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops,
fieldsMap, fieldsInv, fieldsSame, numDocs, cr, closeCh)
if err != nil {
return nil, 0, 0, nil, nil, nil, 0, err
return nil, 0, 0, nil, nil, 0, err
}

// at this point, ask each section implementation to merge itself
Expand All @@ -149,21 +149,19 @@ func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,

err = x.Merge(mergeOpaque, segments, drops, fieldsInv, newDocNums, cr, closeCh)
if err != nil {
return nil, 0, 0, nil, nil, nil, 0, err
return nil, 0, 0, nil, nil, 0, err
}
}
} else {
dictLocs = make([]uint64, len(fieldsInv))
}

// we can persist the fields section index now, this will point
// to the various indexes (each in different section) available for a field.
sectionsIndexOffset, err = persistFieldsSection(fieldsInv, cr, dictLocs, mergeOpaque)
sectionsIndexOffset, err = persistFieldsSection(fieldsInv, cr, mergeOpaque)
if err != nil {
return nil, 0, 0, nil, nil, nil, 0, err
return nil, 0, 0, nil, nil, 0, err
}

return newDocNums, numDocs, storedIndexOffset, dictLocs, fieldsInv, fieldsMap, sectionsIndexOffset, nil
return newDocNums, numDocs, storedIndexOffset, fieldsInv, fieldsMap, sectionsIndexOffset, nil
}

// mapFields takes the fieldsInv list and returns a map of fieldName
Expand Down
33 changes: 15 additions & 18 deletions new.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,13 @@ func (*ZapPlugin) newWithChunkMode(results []index.Document,
s.chunkMode = chunkMode
s.w = NewCountHashWriter(&br)

storedIndexOffset, dictOffsets, sectionsIndexOffset, err := s.convert()
storedIndexOffset, sectionsIndexOffset, err := s.convert()
if err != nil {
return nil, uint64(0), err
}

sb, err := InitSegmentBase(br.Bytes(), s.w.Sum32(), chunkMode,
s.FieldsMap, s.FieldsInv, uint64(len(results)),
storedIndexOffset, dictOffsets, sectionsIndexOffset)
uint64(len(results)), storedIndexOffset, sectionsIndexOffset)

// get the bytes written before the interim's reset() call
// write it to the newly formed segment base.
Expand Down Expand Up @@ -125,8 +124,10 @@ func (s *interim) reset() (err error) {
s.results = nil
s.chunkMode = 0
s.w = nil
s.FieldsMap = nil
s.FieldsInv = nil
for k := range s.FieldsMap {
delete(s.FieldsMap, k)
}
s.FieldsInv = s.FieldsInv[:0]
s.metaBuf.Reset()
s.tmp0 = s.tmp0[:0]
s.tmp1 = s.tmp1[:0]
Expand Down Expand Up @@ -168,8 +169,10 @@ type interimLoc struct {
arrayposs []uint64
}

func (s *interim) convert() (uint64, []uint64, uint64, error) {
s.FieldsMap = map[string]uint16{}
func (s *interim) convert() (uint64, uint64, error) {
if s.FieldsMap == nil {
s.FieldsMap = map[string]uint16{}
}

args := map[string]interface{}{
"results": s.results,
Expand Down Expand Up @@ -209,17 +212,15 @@ func (s *interim) convert() (uint64, []uint64, uint64, error) {

storedIndexOffset, err := s.writeStoredFields()
if err != nil {
return 0, nil, 0, err
return 0, 0, err
}

var dictOffsets []uint64

// we can persist the various sections at this point.
// the rule of thumb here is that each section must persist field wise.
for _, x := range segmentSections {
_, err = x.Persist(s.opaque, s.w)
if err != nil {
return 0, nil, 0, err
return 0, 0, err
}
}

Expand All @@ -231,18 +232,14 @@ func (s *interim) convert() (uint64, []uint64, uint64, error) {
}
}

if len(s.results) == 0 {
dictOffsets = make([]uint64, len(s.FieldsInv))
}

// we can persist a new fields section here
// this new fields section will point to the various indexes available
sectionsIndexOffset, err := persistFieldsSection(s.FieldsInv, s.w, dictOffsets, s.opaque)
sectionsIndexOffset, err := persistFieldsSection(s.FieldsInv, s.w, s.opaque)
if err != nil {
return 0, nil, 0, err
return 0, 0, err
}

return storedIndexOffset, dictOffsets, sectionsIndexOffset, nil
return storedIndexOffset, sectionsIndexOffset, nil
}

func (s *interim) getOrDefineField(fieldName string) int {
Expand Down
13 changes: 2 additions & 11 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (s *SegmentBase) loadFieldsNew() error {
if seek > uint64(len(s.mem)) {
// handling a buffer overflow case.
// a rare case where the backing buffer is not large enough to be read directly via
// a pos+binary.MaxVarinLen64 seek. For eg, this can happen when there is only
// a pos+binary.MaxVarintLen64 seek. For eg, this can happen when there is only
// one field to be indexed in the entire batch of data and while writing out
// these fields metadata, you write 1 + 8 bytes whereas the MaxVarintLen64 = 10.
seek = uint64(len(s.mem))
Expand All @@ -342,7 +342,7 @@ func (s *SegmentBase) loadFieldsNew() error {
// the following loop will be executed only once in the edge case pointed out above
// since there is only field's offset store which occupies 8 bytes.
// the pointer then seeks to a position preceding the sectionsIndexOffset, at
// which point the responbility of handling the out-of-bounds cases shifts to
// which point the responsibility of handling the out-of-bounds cases shifts to
// the specific section's parsing logic.
var fieldID uint64
for fieldID < numFields {
Expand Down Expand Up @@ -867,15 +867,6 @@ func (s *SegmentBase) loadDvReaders() error {

s.incrementBytesRead(read)

dataLoc, n := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64])
if n <= 0 {
return fmt.Errorf("loadDvReaders: failed to read the dataLoc "+
"offset for sectionID %v field %v", secID, s.fieldsInv[fieldID])
}
if secID == SectionInvertedTextIndex {
s.dictLocs = append(s.dictLocs, dataLoc)
s.incrementBytesRead(uint64(n))
}
fieldDvReader, err := s.loadFieldDocValueReader(s.fieldsInv[fieldID], fieldLocStart, fieldLocEnd)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion write.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func writeRoaringWithLen(r *roaring.Bitmap, w io.Writer,
return tw, nil
}

func persistFieldsSection(fieldsInv []string, w *CountHashWriter, dictLocs []uint64, opaque map[int]resetable) (uint64, error) {
func persistFieldsSection(fieldsInv []string, w *CountHashWriter, opaque map[int]resetable) (uint64, error) {
var rv uint64
fieldsOffsets := make([]uint64, 0, len(fieldsInv))

Expand Down

0 comments on commit 75f3dcf

Please sign in to comment.