Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updating merge path with the ivf index creations #173

Merged
merged 6 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,19 @@ require (
github.com/blevesearch/vellum v1.0.10
github.com/golang/snappy v0.0.1
github.com/spf13/cobra v1.4.0
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
)

require (
github.com/bits-and-blooms/bitset v1.2.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/sys v0.13.0 // indirect
)

replace github.com/blevesearch/bleve_index_api => ../bleve_index_api

replace github.com/blevesearch/go-faiss => /Users/thejasbhat/fts/vector_search/go-faiss
replace github.com/blevesearch/go-faiss => ../go-faiss

replace github.com/blevesearch/scorch_segment_api/v2 => ../scorch_segment_api
5 changes: 4 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI=
golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
Expand Down
82 changes: 70 additions & 12 deletions section_vector_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/RoaringBitmap/roaring"
index "github.com/blevesearch/bleve_index_api"
faiss "github.com/blevesearch/go-faiss"
"golang.org/x/exp/maps"
)

func init() {
Expand Down Expand Up @@ -46,6 +47,7 @@ func (v *vectorIndexSection) AddrForField(opaque map[int]resetable, fieldID int)
type vecIndexMeta struct {
startOffset int
indexSize uint64
vecIds []int64
}

func remapDocIDs(oldIDs *roaring.Bitmap, newIDs []uint64) *roaring.Bitmap {
Expand All @@ -68,7 +70,7 @@ LOOP:
for fieldID, _ := range fieldsInv {

var indexes []vecIndexMeta
vecToDocID := make(map[uint64]*roaring.Bitmap)
vecToDocID := make(map[int64]*roaring.Bitmap)

// todo: would parallely fetching the following stuff from segments
// be beneficial in terms of perf?
Expand Down Expand Up @@ -101,6 +103,7 @@ LOOP:

numVecs, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64])
pos += n
indexes[len(indexes)-1].vecIds = make([]int64, 0, numVecs)

for i := 0; i < int(numVecs); i++ {
vecID, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64])
Expand All @@ -119,15 +122,17 @@ LOOP:
}

bitMap = remapDocIDs(bitMap, newDocNumsIn[segI])
if vecToDocID[vecID] == nil {
if vecToDocID[int64(vecID)] == nil {
if drops[segI] != nil && !drops[segI].IsEmpty() {
vecToDocID[vecID] = roaring.AndNot(bitMap, drops[segI])
vecToDocID[int64(vecID)] = roaring.AndNot(bitMap, drops[segI])
} else {
vecToDocID[vecID] = bitMap
vecToDocID[int64(vecID)] = bitMap
}
} else {
vecToDocID[vecID].Or(bitMap)
vecToDocID[int64(vecID)].Or(bitMap)
}

indexes[len(indexes)-1].vecIds = append(indexes[len(indexes)-1].vecIds, int64(vecID))
}
}
err := vo.mergeAndWriteVectorIndexes(fieldID, segments, vecToDocID, indexes, w, closeCh)
Expand All @@ -139,7 +144,7 @@ LOOP:
return nil
}

func (v *vectorIndexOpaque) flushVectorSection(vecToDocID map[uint64]*roaring.Bitmap,
func (v *vectorIndexOpaque) flushVectorSection(vecToDocID map[int64]*roaring.Bitmap,
serializedIndex []byte, w *CountHashWriter) (int, error) {
tempBuf := v.grabBuf(binary.MaxVarintLen64)
fieldStart := w.Count()
Expand Down Expand Up @@ -193,12 +198,7 @@ func (v *vectorIndexOpaque) flushVectorSection(vecToDocID map[uint64]*roaring.Bi
// todo: naive implementation. need to keep in mind the perf implications and improve on this.
// perhaps, parallelized merging can help speed things up over here.
func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(fieldID int, sbs []*SegmentBase,
vecToDocID map[uint64]*roaring.Bitmap, indexes []vecIndexMeta, w *CountHashWriter, closeCh chan struct{}) error {
if len(vecToDocID) >= 100000 {
// merging of more complex index types (for eg ivf family) with reconstruction
// method.
return fmt.Errorf("to be implemented")
}
vecToDocID map[int64]*roaring.Bitmap, indexes []vecIndexMeta, w *CountHashWriter, closeCh chan struct{}) error {

var vecIndexes []*faiss.IndexImpl
for segI, seg := range sbs {
Expand All @@ -211,6 +211,64 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(fieldID int, sbs []*Segme
vecIndexes = append(vecIndexes, index)
}

if len(vecToDocID) > 10000 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should perhaps make this configurable?

If you chose 10000 only because faiss advertises the flat indexes perform well up until 10000 - I'm thinking if we should do some testing on our side to decide what a better value for this could be.

// merging of more complex index types (for eg ivf family) with reconstruction
// method.
var indexData []float32
for i := 0; i < len(vecIndexes); i++ {
if isClosed(closeCh) {
return fmt.Errorf("merging of vector sections aborted")
}
// todo: parallelize reconstruction
recons, err := vecIndexes[i].ReconstructBatch(int64(len(indexes[i].vecIds)), indexes[i].vecIds)
if err != nil {
return err
}
indexData = append(indexData, recons...)
}

// safe to assume that all the indexes are of the same config values, given
// that they are extracted from the field mapping info.
dims := vecIndexes[0].D()
metric := vecIndexes[0].MetricType()
finalVecIDs := maps.Keys(vecToDocID)

index, err := faiss.IndexFactory(dims, "IDMap2,IVF100,SQ8", metric)
if err != nil {
return err
}

index, _ = index.AsIVF()
err = index.MakeDirectMap(2)
abhinavdangeti marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
err = index.Train(indexData)
if err != nil {
return err
}

index.AddWithIDs(indexData, finalVecIDs)
if err != nil {
return err
}

mergedIndexBytes, err := faiss.WriteIndexIntoBuffer(index)
if err != nil {
return err
}

fieldStart, err := v.flushVectorSection(vecToDocID, mergedIndexBytes, w)
if err != nil {
return err
}
v.fieldAddrs[uint16(fieldID)] = fieldStart

return nil
}

// todo: ivf -> flat index when there were huge number of vector deletes for this field

for i := 1; i < len(vecIndexes); i++ {
if isClosed(closeCh) {
return fmt.Errorf("merging of vector sections aborted")
Expand Down
1 change: 0 additions & 1 deletion segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,6 @@ func (s *SegmentBase) visitStoredFields(vdc *visitDocumentCtx, num uint64,
arrayPos[i] = ap
}
}

value := uncompressed[offset : offset+l]
keepGoing = visitor(s.fieldsInv[field], byte(typ), value, arrayPos)
}
Expand Down