Skip to content

Commit

Permalink
unit test fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Thejas-bhat committed Sep 13, 2023
1 parent bd6d08f commit 68b8c51
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 37 deletions.
4 changes: 4 additions & 0 deletions new.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ func (s *interim) convert() (uint64, uint64, uint64, []uint64, uint64, error) {
}
}

if len(s.results) == 0 {
dictOffsets = make([]uint64, len(s.FieldsInv))
}

// we can persist a new fields section here
// this new fields section will point to the various indexes available
sectionsIndexOffset, err := persistNewFields(s.FieldsInv, s.w, dictOffsets, s.opaque)
Expand Down
54 changes: 45 additions & 9 deletions section_inverted_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,9 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring.
return fieldAddrs, fieldDvLocsOffset, nil
}

func (i *invertedIndexSection) Merge(opaque map[int]resetable, segments []*SegmentBase, drops []*roaring.Bitmap, fieldsInv []string,
newDocNumsIn [][]uint64, w *CountHashWriter, closeCh chan struct{}) error {

func (i *invertedIndexSection) Merge(opaque map[int]resetable, segments []*SegmentBase,
drops []*roaring.Bitmap, fieldsInv []string, newDocNumsIn [][]uint64,
w *CountHashWriter, closeCh chan struct{}) error {
io := i.getInvertedIndexOpaque(opaque)
fieldAddrs, _, err := mergeAndPersistInvertedSection(segments, drops, fieldsInv,
io.FieldsMap, io.fieldsSame, newDocNumsIn, io.numDocs, io.chunkMode, w, closeCh)
Expand All @@ -349,7 +349,6 @@ func (i *invertedIndexSection) Merge(opaque map[int]resetable, segments []*Segme
}

io.fieldAddrs = fieldAddrs

return nil
}

Expand All @@ -364,6 +363,12 @@ func (i *invertedIndexOpaque) grabBuf(size int) []byte {

func (io *invertedIndexOpaque) writeDicts(w *CountHashWriter) (dictOffsets []uint64, err error) {

if io.results == nil || len(io.results) == 0 {
// updateSectionOffsets(w, io.grabBuf(binary.MaxVarintLen64),
// fieldNotUninverted, fieldNotUninverted, 0)
return nil, nil
}

dictOffsets = make([]uint64, len(io.FieldsInv))

fdvOffsetsStart := make([]uint64, len(io.FieldsInv))
Expand Down Expand Up @@ -584,9 +589,34 @@ func (io *invertedIndexOpaque) writeDicts(w *CountHashWriter) (dictOffsets []uin
return dictOffsets, nil
}

func updateSectionOffsets(w *CountHashWriter, buf []byte,
dvStart uint64, dvEnd uint64, dictLoc uint64) (int, error) {
fieldStart := w.Count()

n := binary.PutUvarint(buf, dvStart)
_, err := w.Write(buf[:n])
if err != nil {
return 0, err
}

n = binary.PutUvarint(buf, dvEnd)
_, err = w.Write(buf[:n])
if err != nil {
return 0, err
}

n = binary.PutUvarint(buf, dictLoc)
_, err = w.Write(buf[:n])
if err != nil {
return 0, err
}

return fieldStart, nil
}

func (io *invertedIndexOpaque) process(field index.Field, fieldID uint16, docNum uint64) {
if !io.init && io.results != nil {
io.prepareDicts()
io.allocateSpace()
io.init = true
}

Expand Down Expand Up @@ -652,7 +682,7 @@ func (io *invertedIndexOpaque) process(field index.Field, fieldID uint16, docNum
}
}

func (i *invertedIndexOpaque) prepareDicts() {
func (i *invertedIndexOpaque) allocateSpace() {
var pidNext int

var totTFs int
Expand Down Expand Up @@ -842,13 +872,19 @@ type invertedIndexOpaque struct {

chunkMode uint32

FieldsInv []string
// indicates whethere the following structs are initialized
init bool

// FieldsMap adds 1 to field id to avoid zero value issues
// name -> field id + 1
FieldsMap map[string]uint16

// indicates whethere the following structs are initialized
init bool
// FieldsInv is the inverse of FieldsMap
// field id -> name
FieldsInv []string

// Term dictionaries for each field
// field id -> term -> postings list id + 1
Dicts []map[string]uint64

// Terms for each field, where terms are sorted ascending
Expand Down
60 changes: 32 additions & 28 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,23 @@ func (s *SegmentBase) loadFieldNew(fieldID uint16, addr uint64,
fieldSectionAddr := binary.BigEndian.Uint64(s.mem[pos : pos+8])
pos += 8
fieldSectionMap[fieldSectionType] = fieldSectionAddr
if fieldSectionType == sectionInvertedIndex {
// for the fields which don't have the inverted index, the offset is
// 0 and during query time, because there is no valid dictionary we
// will just have follow a no-op path.
if fieldSectionAddr == 0 {
s.dictLocs = append(s.dictLocs, 0)
continue
}
// skip the doc values
_, n := binary.Uvarint(s.mem[fieldSectionAddr : fieldSectionAddr+binary.MaxVarintLen64])
fieldSectionAddr += uint64(n)
_, n = binary.Uvarint(s.mem[fieldSectionAddr : fieldSectionAddr+binary.MaxVarintLen64])
fieldSectionAddr += uint64(n)
dictLoc, _ := binary.Uvarint(s.mem[fieldSectionAddr : fieldSectionAddr+binary.MaxVarintLen64])

s.dictLocs = append(s.dictLocs, dictLoc)
}
}

return nil
Expand Down Expand Up @@ -671,7 +688,7 @@ func (s *Segment) DictAddr(field string) (uint64, error) {
return s.dictLocs[fieldIDPlus1-1], nil
}

func (s *Segment) getDvStartEndOffsets(fieldID int, secID uint16) (uint64, uint64, uint64, error) {
func (s *Segment) getSectionDvOffsets(fieldID int, secID uint16) (uint64, uint64, uint64, error) {
// Version is gonna be 16
var fieldLocStart uint64 = fieldNotUninverted
fieldLocEnd := fieldLocStart
Expand All @@ -694,21 +711,13 @@ func (s *Segment) getDvStartEndOffsets(fieldID int, secID uint16) (uint64, uint6
}
read += uint64(n)
// bytes read increment to be done here

if secID == sectionInvertedIndex {
dictLoc, n := binary.Uvarint(s.mem[fieldAddrStart+read : fieldAddrStart+read+binary.MaxVarintLen64])
if n <= 0 {
return 0, 0, 0, fmt.Errorf("loadDvReaders: failed to read the dictLoc offset for field %d", fieldID)
}
s.dictLocs = append(s.dictLocs, dictLoc)
}
}

return fieldLocStart, fieldLocEnd, 0, nil
}

func (s *Segment) loadDvReader(fieldID int, secID uint16) error {
start, end, _, err := s.getDvStartEndOffsets(fieldID, secID)
start, end, _, err := s.getSectionDvOffsets(fieldID, secID)
if err != nil {
return err
}
Expand All @@ -731,10 +740,6 @@ func (s *Segment) loadDvReader(fieldID int, secID uint16) error {
}

func (s *Segment) loadDvReadersLegacy() error {
if s.numDocs == 0 {
return nil
}

// older file formats to parse the docValueIndex and if that says doc values
// aren't there in this segment file, just return nil
if s.docValueOffset == fieldNotUninverted {
Expand Down Expand Up @@ -778,7 +783,6 @@ func (s *Segment) loadDvReadersLegacy() error {
// must account for the version while loading since the formats are different
// in the older and the Version version.
func (s *Segment) loadDvReaders() error {
// for every field
if s.numDocs == 0 {
return nil
}
Expand All @@ -787,8 +791,9 @@ func (s *Segment) loadDvReaders() error {
return s.loadDvReadersLegacy()
}

// for every section of every field, load the doc values and register
// the readers.
for fieldID := range s.fieldsInv {
// for every section
for secID := range segmentSections {
s.loadDvReader(fieldID, secID)
}
Expand All @@ -808,28 +813,28 @@ func (s *SegmentBase) loadDvReaders() error {
}

for fieldID, sections := range s.fieldsSectionsMap {
for secID, fieldAddrStart := range sections {
if fieldAddrStart > 0 {
for secID, secOffset := range sections {
if secOffset > 0 {
// fixed encoding as of now, need to uvarint this
var n uint64

fieldLocStart, read := binary.Uvarint(s.mem[fieldAddrStart+n : fieldAddrStart+n+binary.MaxVarintLen64])
pos := secOffset
fieldLocStart, read := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64])
if read <= 0 {
return fmt.Errorf("loadDvReaders: failed to read the docvalue offset start for field %v", s.fieldsInv[fieldID])
}
n += uint64(read)
fieldLocEnd, read := binary.Uvarint(s.mem[fieldAddrStart+n : fieldAddrStart+n+binary.MaxVarintLen64])
pos += uint64(read)
fieldLocEnd, read := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64])
if read <= 0 {
return fmt.Errorf("loadDvReaders: failed to read the docvalue offset end for field %v", s.fieldsInv[fieldID])
}
n += uint64(read)
pos += uint64(read)

dictLoc, read := binary.Uvarint(s.mem[fieldAddrStart+n : fieldAddrStart+n+binary.MaxVarintLen64])
dataLoc, read := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64])
if read <= 0 {
return fmt.Errorf("loadDvReaders: failed to read the dict offset for field %v", s.fieldsInv[fieldID])
return fmt.Errorf("loadDvReaders: failed to read the dataLoc "+
"offset for sectionID %v field %v", secID, s.fieldsInv[fieldID])
}
if secID == sectionInvertedIndex {
s.dictLocs = append(s.dictLocs, dictLoc)
s.dictLocs = append(s.dictLocs, dataLoc)
}
fieldDvReader, err := s.loadFieldDocValueReader(s.fieldsInv[fieldID], fieldLocStart, fieldLocEnd)
if err != nil {
Expand All @@ -844,7 +849,6 @@ func (s *SegmentBase) loadDvReaders() error {
}
}
}

}

return nil
Expand Down

0 comments on commit 68b8c51

Please sign in to comment.