diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 2719d437..fd0fec90 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -2,6 +2,7 @@ on: push: branches: - master + - v15.x - v14.x - v13.x - v12.x @@ -12,7 +13,7 @@ jobs: test: strategy: matrix: - go-version: [1.18.x, 1.19.x, 1.20.x] + go-version: [1.19.x, 1.20.x, 1.21.x] platform: [ubuntu-latest, macos-latest] runs-on: ${{ matrix.platform }} steps: diff --git a/cmd/zap/cmd/dict.go b/cmd/zap/cmd/dict.go index 51c2df15..93a2e83c 100644 --- a/cmd/zap/cmd/dict.go +++ b/cmd/zap/cmd/dict.go @@ -21,7 +21,7 @@ import ( "github.com/RoaringBitmap/roaring" "github.com/blevesearch/vellum" - zap "github.com/blevesearch/zapx/v15" + zap "github.com/blevesearch/zapx/v16" "github.com/spf13/cobra" ) diff --git a/cmd/zap/cmd/docvalue.go b/cmd/zap/cmd/docvalue.go index fad9a0fa..7586b2e7 100644 --- a/cmd/zap/cmd/docvalue.go +++ b/cmd/zap/cmd/docvalue.go @@ -23,7 +23,7 @@ import ( "sort" "strconv" - zap "github.com/blevesearch/zapx/v15" + zap "github.com/blevesearch/zapx/v16" "github.com/golang/snappy" "github.com/spf13/cobra" ) diff --git a/cmd/zap/cmd/explore.go b/cmd/zap/cmd/explore.go index 46359b20..4ce390d4 100644 --- a/cmd/zap/cmd/explore.go +++ b/cmd/zap/cmd/explore.go @@ -21,7 +21,7 @@ import ( "github.com/RoaringBitmap/roaring" "github.com/blevesearch/vellum" - zap "github.com/blevesearch/zapx/v15" + zap "github.com/blevesearch/zapx/v16" "github.com/spf13/cobra" ) diff --git a/cmd/zap/cmd/fields.go b/cmd/zap/cmd/fields.go index 9f5af6f1..46f6152a 100644 --- a/cmd/zap/cmd/fields.go +++ b/cmd/zap/cmd/fields.go @@ -18,7 +18,7 @@ import ( "encoding/binary" "fmt" - zap "github.com/blevesearch/zapx/v15" + zap "github.com/blevesearch/zapx/v16" "github.com/spf13/cobra" ) diff --git a/cmd/zap/cmd/root.go b/cmd/zap/cmd/root.go index 86f322f0..8320d1da 100644 --- a/cmd/zap/cmd/root.go +++ b/cmd/zap/cmd/root.go @@ -18,7 +18,7 @@ import ( "fmt" "os" - zap "github.com/blevesearch/zapx/v15" + zap "github.com/blevesearch/zapx/v16" "github.com/spf13/cobra" ) diff --git a/cmd/zap/main.go b/cmd/zap/main.go index 788404c3..ab0e00e7 100644 --- a/cmd/zap/main.go +++ b/cmd/zap/main.go @@ -15,7 +15,7 @@ package main import ( - "github.com/blevesearch/zapx/v15/cmd/zap/cmd" + "github.com/blevesearch/zapx/v16/cmd/zap/cmd" ) func main() { diff --git a/docvalues.go b/docvalues.go index 31d7f6eb..30f46118 100644 --- a/docvalues.go +++ b/docvalues.go @@ -309,7 +309,7 @@ func (s *SegmentBase) VisitDocValues(localDocNum uint64, fields []string, continue } fieldID := fieldIDPlus1 - 1 - if dvIter, exists := s.fieldDvReaders[sectionInvertedIndex][fieldID]; exists && + if dvIter, exists := s.fieldDvReaders[sectionInvertedTextIndex][fieldID]; exists && dvIter != nil { dvs.dvrs[fieldID] = dvIter.cloneInto(dvs.dvrs[fieldID]) } diff --git a/faiss_vector_posting.go b/faiss_vector_posting.go new file mode 100644 index 00000000..017c3101 --- /dev/null +++ b/faiss_vector_posting.go @@ -0,0 +1,357 @@ +// Copyright (c) 2023 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build vectors +// +build vectors + +package zap + +import ( + "encoding/binary" + "math" + "reflect" + + "github.com/RoaringBitmap/roaring" + "github.com/RoaringBitmap/roaring/roaring64" + faiss "github.com/blevesearch/go-faiss" + segment "github.com/blevesearch/scorch_segment_api/v2" +) + +var reflectStaticSizeVecPostingsList int +var reflectStaticSizeVecPostingsIterator int +var reflectStaticSizeVecPosting int + +func init() { + var pl VecPostingsList + reflectStaticSizeVecPostingsList = int(reflect.TypeOf(pl).Size()) + var pi VecPostingsIterator + reflectStaticSizeVecPostingsIterator = int(reflect.TypeOf(pi).Size()) + var p VecPosting + reflectStaticSizeVecPosting = int(reflect.TypeOf(p).Size()) +} + +type VecPosting struct { + docNum uint64 + score float32 +} + +func (vp *VecPosting) Number() uint64 { + return vp.docNum +} + +func (vp *VecPosting) Score() float32 { + return vp.score +} + +func (vp *VecPosting) Size() int { + sizeInBytes := reflectStaticSizePosting + + return sizeInBytes +} + +// ============================================================================= + +// the vector postings list is supposed to store the docNum and its similarity +// score as a vector postings entry in it. +// The way in which is it stored is using a roaring64 bitmap. +// the docNum is stored in high 32 and the lower 32 bits contains the score value. +// the score is actually a float32 value and in order to store it as a uint32 in +// the bitmap, we use the IEEE 754 floating point format. +// +// each entry in the roaring64 bitmap of the vector postings list is a 64 bit +// number which looks like this: +// MSB LSB +// |64 63 62 ... 32| 31 30 ... 0| +// | | | +type VecPostingsList struct { + // todo: perhaps we don't even need to store a bitmap if there is only + // one similar vector the query, but rather store it as a field value + // in the struct + except *roaring64.Bitmap + postings *roaring64.Bitmap +} + +var emptyVecPostingsIterator = &VecPostingsIterator{} +var emptyVecPostingsList = &VecPostingsList{} + +func (vpl *VecPostingsList) Iterator(prealloc segment.VecPostingsIterator) segment.VecPostingsIterator { + + // tbd: do we check the cardinality of postings and scores? + var preallocPI *VecPostingsIterator + pi, ok := prealloc.(*VecPostingsIterator) + if ok && pi != nil { + preallocPI = pi + } + if preallocPI == emptyVecPostingsIterator { + preallocPI = nil + } + + return vpl.iterator(preallocPI) +} + +func (p *VecPostingsList) iterator(rv *VecPostingsIterator) *VecPostingsIterator { + if rv == nil { + rv = &VecPostingsIterator{} + } else { + *rv = VecPostingsIterator{} // clear the struct + } + // think on some of the edge cases over here. + if p.postings == nil { + return rv + } + rv.postings = p + rv.all = p.postings.Iterator() + if p.except != nil { + rv.ActualBM = roaring64.AndNot(p.postings, p.except) + rv.Actual = rv.ActualBM.Iterator() + } else { + rv.ActualBM = p.postings + rv.Actual = rv.all // Optimize to use same iterator for all & Actual. + } + return rv +} + +func (p *VecPostingsList) Size() int { + sizeInBytes := reflectStaticSizeVecPostingsList + SizeOfPtr + + if p.except != nil { + sizeInBytes += int(p.except.GetSizeInBytes()) + } + + return sizeInBytes +} + +func (p *VecPostingsList) Count() uint64 { + n := p.postings.GetCardinality() + var e uint64 + if p.except != nil { + e = p.postings.AndCardinality(p.except) + } + return n - e +} + +func (vpl *VecPostingsList) ResetBytesRead(val uint64) { + +} + +func (vpl *VecPostingsList) BytesRead() uint64 { + return 0 +} + +func (vpl *VecPostingsList) BytesWritten() uint64 { + return 0 +} + +// ============================================================================= + +const maskLow32Bits = 0x7fffffff + +type VecPostingsIterator struct { + postings *VecPostingsList + all roaring64.IntPeekable64 + Actual roaring64.IntPeekable64 + ActualBM *roaring64.Bitmap + + next VecPosting // reused across Next() calls +} + +func (i *VecPostingsIterator) nextCodeAtOrAfterClean(atOrAfter uint64) (uint64, bool, error) { + i.Actual.AdvanceIfNeeded(atOrAfter) + + if !i.Actual.HasNext() { + return 0, false, nil // couldn't find anything + } + + return i.Actual.Next(), true, nil +} + +func (i *VecPostingsIterator) nextCodeAtOrAfter(atOrAfter uint64) (uint64, bool, error) { + if i.Actual == nil || !i.Actual.HasNext() { + return 0, false, nil + } + + if i.postings == nil || i.postings == emptyVecPostingsList { + // couldn't find anything + return 0, false, nil + } + + if i.postings.postings == i.ActualBM { + return i.nextCodeAtOrAfterClean(atOrAfter) + } + + i.Actual.AdvanceIfNeeded(atOrAfter) + + if !i.Actual.HasNext() || !i.all.HasNext() { + // couldn't find anything + return 0, false, nil + } + + n := i.Actual.Next() + allN := i.all.Next() + + // n is the next actual hit (excluding some postings), and + // allN is the next hit in the full postings, and + // if they don't match, move 'all' forwards until they do. + for allN != n { + if !i.all.HasNext() { + return 0, false, nil + } + allN = i.all.Next() + } + + return uint64(n), true, nil +} + +// a transformation function which stores both the score and the docNum as a single +// entry which is a uint64 number. +func getVectorCode(docNum uint32, score float32) uint64 { + return uint64(docNum)<<31 | uint64(math.Float32bits(score)) +} + +// Next returns the next posting on the vector postings list, or nil at the end +func (i *VecPostingsIterator) nextAtOrAfter(atOrAfter uint64) (segment.VecPosting, error) { + // transform the docNum provided to the vector code format and use that to + // get the next entry. the comparison still happens docNum wise since after + // the transformation, the docNum occupies the upper 32 bits just an entry in + // the postings list + atOrAfter = getVectorCode(uint32(atOrAfter), 0) + code, exists, err := i.nextCodeAtOrAfter(atOrAfter) + if err != nil || !exists { + return nil, err + } + + i.next = VecPosting{} // clear the struct + rv := &i.next + rv.score = math.Float32frombits(uint32(code & maskLow32Bits)) + rv.docNum = code >> 31 + + return rv, nil +} + +func (itr *VecPostingsIterator) Next() (segment.VecPosting, error) { + return itr.nextAtOrAfter(0) +} + +func (itr *VecPostingsIterator) Advance(docNum uint64) (segment.VecPosting, error) { + return itr.nextAtOrAfter(docNum) +} + +func (i *VecPostingsIterator) Size() int { + sizeInBytes := reflectStaticSizePostingsIterator + SizeOfPtr + + i.next.Size() + + return sizeInBytes +} + +func (vpl *VecPostingsIterator) ResetBytesRead(val uint64) { + +} + +func (vpl *VecPostingsIterator) BytesRead() uint64 { + return 0 +} + +func (vpl *VecPostingsIterator) BytesWritten() uint64 { + return 0 +} + +func (sb *SegmentBase) SimilarVectors(field string, qVector []float32, k int64, except *roaring.Bitmap) (segment.VecPostingsList, error) { + + // 1. returned postings list (of type PostingsList) has two types of information - docNum and its score. + // 2. both the values can be represented using roaring bitmaps. + // 3. the Iterator (of type PostingsIterator) returned would operate in terms of VecPostings. + // 4. VecPostings would just have the docNum and the score. Every call of Next() + // and Advance just returns the next VecPostings. The caller would do a vp.Number() + // and the Score() to get the corresponding values + rv := &VecPostingsList{ + except: nil, // todo: handle the except bitmap within postings iterator. + postings: roaring64.New(), + } + + vecDocIDMap := make(map[int64][]uint32) + fieldIDPlus1 := sb.fieldsMap[field] + if fieldIDPlus1 > 0 { + vectorSection := sb.fieldsSectionsMap[fieldIDPlus1-1][sectionFaissVectorIndex] + // check if the field has a vector section in the segment. + if vectorSection > 0 { + pos := int(vectorSection) + + // loading doc values - adhering to the sections format. never + // valid values for vector section + _, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += n + + _, n = binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += n + + // todo: not a good idea to cache the vector index perhaps, since it could be quite huge. + indexSize, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += n + indexBytes := sb.mem[pos : pos+int(indexSize)] + pos += int(indexSize) + + // read the number vectors indexed for this field and load the vector to docID mapping. + // todo: cache the vecID to docIDs mapping for a fieldID + numVecs, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += n + for i := 0; i < int(numVecs); i++ { + vecID, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += n + + bitMapLen, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += n + + roaringBytes := sb.mem[pos : pos+int(bitMapLen)] + pos += int(bitMapLen) + + bitMap := roaring.NewBitmap() + _, err := bitMap.FromBuffer(roaringBytes) + if err != nil { + return nil, err + } + + vecDocIDMap[int64(vecID)] = bitMap.ToArray() + } + + vecIndex, err := faiss.ReadIndexFromBuffer(indexBytes, faiss.IOFlagReadOnly) + if err != nil { + return nil, err + } + + scores, ids, err := vecIndex.Search(qVector, k) + if err != nil { + return nil, err + } + + // for every similar vector returned by the Search() API, add the corresponding + // docID and the score to the newly created vecPostingsList + for i := 0; i < len(ids); i++ { + vecID := ids[i] + docIDs := vecDocIDMap[vecID] + var code uint64 + + for _, docID := range docIDs { + if except != nil && except.Contains(docID) { + // ignore the deleted doc + continue + } + code = getVectorCode(docID, scores[i]) + rv.postings.Add(uint64(code)) + } + } + } + } + + return rv, nil +} diff --git a/faiss_vector_test.go b/faiss_vector_test.go new file mode 100644 index 00000000..84cddad8 --- /dev/null +++ b/faiss_vector_test.go @@ -0,0 +1,460 @@ +//go:build vectors +// +build vectors + +package zap + +import ( + "encoding/binary" + "fmt" + "math" + "os" + "testing" + + "github.com/RoaringBitmap/roaring/roaring64" + index "github.com/blevesearch/bleve_index_api" + faiss "github.com/blevesearch/go-faiss" + segment "github.com/blevesearch/scorch_segment_api/v2" +) + +func getStubDocScores(k int) (ids []uint64, scores []float32, err error) { + for i := 1; i <= k; i++ { + ids = append(ids, uint64(i)) + scores = append(scores, float32((2*i+3)/200)) + } + return ids, scores, nil +} + +func TestVecPostingsIterator(t *testing.T) { + + vecPL := &VecPostingsList{ + postings: roaring64.New(), + } + + ids, scores, err := getStubDocScores(10) + if err != nil { + t.Fatal(err) + } + docIDs := make(map[uint64]float32) + + for i, id := range ids { + code := uint64(id)<<31 | uint64(math.Float32bits(scores[i])) + vecPL.postings.Add(code) + docIDs[id] = scores[i] + } + + iter := vecPL.Iterator(nil) + for i := 0; true; i++ { + vp, err := iter.Next() + if err != nil { + t.Fatal(err) + } + if vp == nil { + break + } + if vp.Number() != ids[i] { + t.Fatalf("expected %d, got %d", ids[i], vp.Number()) + } + if vp.Score() != scores[i] { + t.Fatalf("expected %f, got %f", scores[i], vp.Score()) + } + } +} + +type stubVecField struct { + name string + value []float32 + dims int + similarity string + encodedType byte + options index.FieldIndexingOptions +} + +// Vector is an implementation of the index.VectorField interface. +func (n *stubVecField) Vector() []float32 { + return n.value +} + +func (n *stubVecField) Similarity() string { + return n.similarity +} + +func (n *stubVecField) Dims() int { + return n.dims +} + +func (n *stubVecField) Size() int { + return 0 +} + +func (n *stubVecField) Name() string { + return n.name +} + +func (n *stubVecField) ArrayPositions() []uint64 { + return nil +} + +func (n *stubVecField) Options() index.FieldIndexingOptions { + return n.options +} + +func (n *stubVecField) NumPlainTextBytes() uint64 { + return 0 +} + +func (n *stubVecField) AnalyzedLength() int { + // dense vectors aren't analyzed + return 0 +} + +func (n *stubVecField) EncodedFieldType() byte { + return 'v' +} + +func (n *stubVecField) AnalyzedTokenFrequencies() index.TokenFrequencies { + // dense vectors aren't analyzed + return nil +} + +func (n *stubVecField) Analyze() { + // dense vectors aren't analyzed +} + +func (n *stubVecField) Value() []byte { + return nil +} + +func newStubFieldVec(name string, vector []float32, d int, metric string, fieldOptions index.FieldIndexingOptions) index.Field { + return &stubVecField{ + name: name, + value: vector, + dims: d, + similarity: metric, + encodedType: 'v', + options: fieldOptions, + } +} + +func stubVecData() [][]float32 { + rv := [][]float32{ + {1.0, 2.0, 3.0}, + {12.0, 42.6, 78.65}, + {6.7, 0.876, 9.45}, + {7.437, 9.994, 0.407}, + {4.439, 0.307, 1.063}, + {6.653, 7.752, 0.972}, + } + return rv +} + +func stubVec1Data() [][]float32 { + rv := [][]float32{ + {5.6, 2.3, 9.8}, + {89.1, 312.7, 940.65}, + {123.4, 8.98, 0.765}, + {0.413, 9.054, 3.393}, + {2.463, 3.388, 2.082}, + {3.371, 3.473, 6.906}, + } + return rv +} + +func buildMultiDocDataset() []index.Document { + + stubVecs := stubVecData() + stubVecs1 := stubVec1Data() + + doc1 := newStubDocument("a", []*stubField{ + newStubFieldSplitString("_id", nil, "a", true, false, false), + newStubFieldSplitString("name", nil, "wow", true, false, true), + newStubFieldSplitString("desc", nil, "some thing", true, false, true), + newStubFieldSplitString("tag", []uint64{0}, "cold", true, false, true), + newStubFieldSplitString("tag", []uint64{1}, "dark", true, false, true), + }, "_all") + + doc2 := newStubDocument("b", []*stubField{ + newStubFieldSplitString("_id", nil, "b", true, false, false), + newStubFieldSplitString("name", nil, "who", true, false, true), + newStubFieldSplitString("desc", nil, "some thing", true, false, true), + newStubFieldSplitString("tag", []uint64{0}, "cold", true, false, true), + newStubFieldSplitString("tag", []uint64{1}, "dark", true, false, true), + }, "_all") + + doc3 := newVecStubDocument("c", []index.Field{ + newStubFieldSplitString("_id", nil, "c", true, false, false), + newStubFieldVec("stubVec", stubVecs[0], 3, "l2", index.IndexField), + newStubFieldVec("stubVec2", stubVecs1[0], 3, "l2", index.IndexField), + }) + + doc4 := newVecStubDocument("d", []index.Field{ + newStubFieldSplitString("_id", nil, "d", true, false, false), + newStubFieldVec("stubVec", stubVecs[1], 3, "l2", index.IndexField), + newStubFieldVec("stubVec2", stubVecs1[1], 3, "l2", index.IndexField), + }) + doc5 := newVecStubDocument("e", []index.Field{ + newStubFieldSplitString("_id", nil, "e", true, false, false), + newStubFieldVec("stubVec", stubVecs[2], 3, "l2", index.IndexField), + newStubFieldVec("stubVec2", stubVecs1[2], 3, "l2", index.IndexField), + }) + + doc6 := newVecStubDocument("f", []index.Field{ + newStubFieldSplitString("_id", nil, "f", true, false, false), + newStubFieldVec("stubVec", stubVecs[3], 3, "l2", index.IndexField), + newStubFieldVec("stubVec2", stubVecs1[3], 3, "l2", index.IndexField), + }) + doc7 := newVecStubDocument("g", []index.Field{ + newStubFieldSplitString("_id", nil, "g", true, false, false), + newStubFieldVec("stubVec", stubVecs[4], 3, "l2", index.IndexField), + newStubFieldVec("stubVec2", stubVecs1[4], 3, "l2", index.IndexField), + }) + + doc8 := newVecStubDocument("h", []index.Field{ + newStubFieldSplitString("_id", nil, "h", true, false, false), + newStubFieldVec("stubVec", stubVecs[5], 3, "l2", index.IndexField), + newStubFieldVec("stubVec2", stubVecs1[5], 3, "l2", index.IndexField), + }) + + results := []index.Document{ + doc1, + doc2, + doc3, + doc4, + doc5, + doc6, + doc7, + doc8, + } + + return results +} + +type stubVecDocument struct { + id string + fields []index.Field + composite []*stubField +} + +func (s *stubVecDocument) StoredFieldsBytes() uint64 { + return 0 +} + +func (s *stubVecDocument) ID() string { + return s.id +} + +func (s *stubVecDocument) Size() int { + return 0 +} + +func (s *stubVecDocument) VisitFields(visitor index.FieldVisitor) { + for _, f := range s.fields { + visitor(f) + } +} + +func (s *stubVecDocument) HasComposite() bool { + return len(s.composite) > 0 +} + +func (s *stubVecDocument) VisitComposite(visitor index.CompositeFieldVisitor) { + for _, c := range s.composite { + visitor(c) + } +} + +func (s *stubVecDocument) NumPlainTextBytes() uint64 { + return 0 +} + +func (s *stubVecDocument) AddIDField() { + +} + +func newVecStubDocument(id string, fields []index.Field) *stubVecDocument { + return &stubVecDocument{ + id: id, + fields: fields, + } +} + +func getSectionContentOffsets(sb *SegmentBase, offset uint64) ( + docValueStart uint64, + docValueEnd uint64, + indexBytesLen uint64, + indexBytesOffset uint64, + numVecs uint64, + vecDocIDsMappingOffset uint64, +) { + pos := offset + docValueStart, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += uint64(n) + + docValueEnd, n = binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += uint64(n) + + indexBytesLen, n = binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += uint64(n) + + indexBytesOffset = pos + pos += indexBytesLen + + numVecs, n = binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += uint64(n) + + vecDocIDsMappingOffset = pos + + return docValueStart, docValueEnd, indexBytesLen, indexBytesOffset, numVecs, vecDocIDsMappingOffset +} + +func serializeVecs(dataset [][]float32) []float32 { + var vecs []float32 + for _, vec := range dataset { + vecs = append(vecs, vec...) + } + return vecs +} + +func letsCreateVectorIndexForTesting(dataset [][]float32, dims int, similarity string) (*faiss.IndexImpl, error) { + vecs := serializeVecs(dataset) + + idx, err := faiss.IndexFactory(dims, "Flat,IDMap2", faiss.MetricL2) + if err != nil { + return nil, err + } + + idx.Train(vecs) + + ids := make([]int64, len(dataset)) + for i := 0; i < len(dataset); i++ { + ids[i] = int64(i) + } + + idx.AddWithIDs(vecs, ids) + + return idx, nil +} + +func TestVectorSegment(t *testing.T) { + docs := buildMultiDocDataset() + + vecSegPlugin := &ZapPlugin{} + seg, _, err := vecSegPlugin.New(docs) + if err != nil { + t.Fatal(err) + } + vecSegBase, ok := seg.(*SegmentBase) + if !ok { + t.Fatal("not a segment base") + } + + path := "./test-seg" + err = vecSegBase.Persist(path) + if err != nil { + t.Fatal(err) + } + + segOnDisk, err := vecSegPlugin.Open(path) + if err != nil { + t.Fatal(err) + } + + fieldsSectionsMap := vecSegBase.fieldsSectionsMap + stubVecFieldStartAddr := fieldsSectionsMap[vecSegBase.fieldsMap["stubVec"]-1][sectionFaissVectorIndex] + docValueStart, docValueEnd, indexBytesLen, _, + numVecs, _ := getSectionContentOffsets(vecSegBase, stubVecFieldStartAddr) + + if docValueStart != fieldNotUninverted { + t.Fatal("vector field doesn't support doc values") + } + + if docValueEnd != fieldNotUninverted { + t.Fatal("vector field doesn't support doc values") + } + + data := stubVecData() + vecIndex, err := letsCreateVectorIndexForTesting(data, 3, "l2") + if err != nil { + t.Fatalf("error creating vector index %v", err) + } + buf, err := faiss.WriteIndexIntoBuffer(vecIndex) + if err != nil { + t.Fatalf("error serializing vector index %v", err) + } + + if indexBytesLen != uint64(len(buf)) { + t.Fatalf("expected %d bytes got %d bytes", len(buf), indexBytesLen) + } + + if numVecs != uint64(vecIndex.Ntotal()) { + t.Fatalf("expected %d vecs got %d vecs", vecIndex.Ntotal(), numVecs) + } + + if vecSeg, ok := segOnDisk.(segment.VectorSegment); ok { + pl, err := vecSeg.SimilarVectors("stubVec", []float32{0.0, 0.0, 0.0}, 3, nil) + if err != nil { + t.Fatal(err) + } + itr := pl.Iterator(nil) + + for { + next, err := itr.Next() + if err != nil { + t.Fatal(err) + } + if next == nil { + break + } + fmt.Printf("similar vec %v score %v\n", next.Number(), next.Score()) + } + } +} + +func TestPersistedVectorSegment(t *testing.T) { + docs := buildMultiDocDataset() + + vecSegPlugin := &ZapPlugin{} + seg, _, err := vecSegPlugin.New(docs) + if err != nil { + t.Fatal(err) + } + + path := "./test-seg" + if unPersistedSeg, ok := seg.(segment.UnpersistedSegment); ok { + err = unPersistedSeg.Persist(path) + if err != nil { + t.Fatal(err) + } + } + + segOnDisk, err := vecSegPlugin.Open(path) + if err != nil { + t.Fatal(err) + } + + defer func() { + cerr := segOnDisk.Close() + if cerr != nil { + t.Fatalf("error closing segment: %v", cerr) + } + _ = os.RemoveAll(path) + }() + + if vecSeg, ok := segOnDisk.(segment.VectorSegment); ok { + pl, err := vecSeg.SimilarVectors("stubVec", []float32{0.0, 0.0, 0.0}, 3, nil) + if err != nil { + t.Fatal(err) + } + itr := pl.Iterator(nil) + + for { + next, err := itr.Next() + if err != nil { + t.Fatal(err) + } + if next == nil { + break + } + fmt.Printf("similar vec %v score %v\n", next.Number(), next.Score()) + } + } +} diff --git a/go.mod b/go.mod index ed5fabcc..4d9c56c6 100644 --- a/go.mod +++ b/go.mod @@ -1,21 +1,23 @@ -module github.com/blevesearch/zapx/v15 +module github.com/blevesearch/zapx/v16 -go 1.19 +go 1.20 require ( github.com/RoaringBitmap/roaring v1.2.3 - github.com/blevesearch/bleve_index_api v1.0.5 + github.com/blevesearch/bleve_index_api v1.1.2 + github.com/blevesearch/go-faiss v1.0.1 github.com/blevesearch/mmap-go v1.0.4 - github.com/blevesearch/scorch_segment_api/v2 v2.1.5 + github.com/blevesearch/scorch_segment_api/v2 v2.2.2 github.com/blevesearch/vellum v1.0.10 github.com/golang/snappy v0.0.1 - github.com/spf13/cobra v1.4.0 + github.com/spf13/cobra v1.7.0 + golang.org/x/exp v0.0.0-20231006140011-7918f672742d ) require ( github.com/bits-and-blooms/bitset v1.2.0 // indirect - github.com/inconshreveable/mousetrap v1.0.0 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/mschoch/smat v0.2.0 // indirect github.com/spf13/pflag v1.0.5 // indirect - golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect + golang.org/x/sys v0.13.0 // indirect ) diff --git a/go.sum b/go.sum index 821c9fd5..49463177 100644 --- a/go.sum +++ b/go.sum @@ -2,36 +2,41 @@ github.com/RoaringBitmap/roaring v1.2.3 h1:yqreLINqIrX22ErkKI0vY47/ivtJr6n+kMhVO github.com/RoaringBitmap/roaring v1.2.3/go.mod h1:plvDsJQpxOC5bw8LRteu/MLWHsHez/3y6cubLI4/1yE= github.com/bits-and-blooms/bitset v1.2.0 h1:Kn4yilvwNtMACtf1eYDlG8H77R07mZSPbMjLyS07ChA= github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= -github.com/blevesearch/bleve_index_api v1.0.5 h1:Lc986kpC4Z0/n1g3gg8ul7H+lxgOQPcXb9SxvQGu+tw= -github.com/blevesearch/bleve_index_api v1.0.5/go.mod h1:YXMDwaXFFXwncRS8UobWs7nvo0DmusriM1nztTlj1ms= +github.com/blevesearch/bleve_index_api v1.1.2 h1:A8MhXiNbZ9DI+lZytWkOY75MwesRdOlE+7/MzdC9YXY= +github.com/blevesearch/bleve_index_api v1.1.2/go.mod h1:PbcwjIcRmjhGbkS/lJCpfgVSMROV6TRubGGAODaK1W8= +github.com/blevesearch/go-faiss v1.0.1 h1:B0/FGdmcdxHIM0DRPyy4aWk0ZjMTFbCsmIzra77GAxE= +github.com/blevesearch/go-faiss v1.0.1/go.mod h1:jrxHrbl42X/RnDPI+wBoZU8joxxuRwedrxqswQ3xfU8= github.com/blevesearch/mmap-go v1.0.4 h1:OVhDhT5B/M1HNPpYPBKIEJaD0F3Si+CrEKULGCDPWmc= github.com/blevesearch/mmap-go v1.0.4/go.mod h1:EWmEAOmdAS9z/pi/+Toxu99DnsbhG1TIxUoRmJw/pSs= -github.com/blevesearch/scorch_segment_api/v2 v2.1.5 h1:1g713kpCQZ8u4a3stRGBfrwVOuGRnmxOVU5MQkUPrHU= -github.com/blevesearch/scorch_segment_api/v2 v2.1.5/go.mod h1:f2nOkKS1HcjgIWZgDAErgBdxmr2eyt0Kn7IY+FU1Xe4= +github.com/blevesearch/scorch_segment_api/v2 v2.2.2 h1:wRkJ2tAP0HsS4M2pK/qDkNV0PvUo33umcsjzLIgvVA4= +github.com/blevesearch/scorch_segment_api/v2 v2.2.2/go.mod h1:7mKEerrxzvfImS2pMvpfV5MGFYVcQ9NBTkBd/ApU7cI= github.com/blevesearch/vellum v1.0.10 h1:HGPJDT2bTva12hrHepVT3rOyIKFFF4t7Gf6yMxyMIPI= github.com/blevesearch/vellum v1.0.10/go.mod h1:ul1oT0FhSMDIExNjIxHqJoGpVrBpKCdgDQNxfqgJt7k= -github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= -github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= -github.com/spf13/cobra v1.4.0 h1:y+wJpx64xcgO1V+RcnwW0LEHxTKRi2ZDPSBjWnrg88Q= -github.com/spf13/cobra v1.4.0/go.mod h1:Wo4iy3BUC+X2Fybo0PDqwJIv3dNRiZLHQymsfxlB84g= +github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= +github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/section.go b/section.go index eff733b8..bb92861f 100644 --- a/section.go +++ b/section.go @@ -15,6 +15,8 @@ package zap import ( + "sync" + "github.com/RoaringBitmap/roaring" index "github.com/blevesearch/bleve_index_api" ) @@ -51,10 +53,25 @@ type resetable interface { Set(key string, value interface{}) } +// ----------------------------------------------------------------------------- + const ( - sectionInvertedIndex = iota + sectionInvertedTextIndex = iota + sectionFaissVectorIndex +) + +// ----------------------------------------------------------------------------- + +var ( + segmentSectionsMutex sync.Mutex + // writes to segmentSections within init()s ONLY within lock, + // reads will not require lock access + segmentSections = make(map[uint16]section) ) -var segmentSections = map[uint16]section{ - sectionInvertedIndex: &invertedTextIndexSection{}, +// Method to be invoked within init()s ONLY. +func registerSegmentSection(key uint16, val section) { + segmentSectionsMutex.Lock() + segmentSections[key] = val + segmentSectionsMutex.Unlock() } diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go new file mode 100644 index 00000000..c647e91a --- /dev/null +++ b/section_faiss_vector_index.go @@ -0,0 +1,564 @@ +// Copyright (c) 2023 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build vectors +// +build vectors + +package zap + +import ( + "encoding/binary" + "fmt" + "math" + + "github.com/RoaringBitmap/roaring" + index "github.com/blevesearch/bleve_index_api" + faiss "github.com/blevesearch/go-faiss" + "golang.org/x/exp/maps" +) + +func init() { + registerSegmentSection(sectionFaissVectorIndex, &faissVectorIndexSection{}) +} + +type faissVectorIndexSection struct { +} + +func (v *faissVectorIndexSection) Process(opaque map[int]resetable, docNum uint64, field index.Field, fieldID uint16) { + if fieldID == math.MaxUint16 { + return + } + + if vf, ok := field.(index.VectorField); ok { + vo := v.getvectorIndexOpaque(opaque) + vo.process(vf, fieldID, docNum) + } +} + +func (v *faissVectorIndexSection) Persist(opaque map[int]resetable, w *CountHashWriter) (n int64, err error) { + vo := v.getvectorIndexOpaque(opaque) + vo.writeVectorIndexes(w) + return 0, nil +} + +func (v *faissVectorIndexSection) AddrForField(opaque map[int]resetable, fieldID int) int { + vo := v.getvectorIndexOpaque(opaque) + return vo.fieldAddrs[uint16(fieldID)] +} + +// metadata corresponding to a serialized vector index +type vecIndexMeta struct { + startOffset int + indexSize uint64 + vecIds []int64 +} + +func remapDocIDs(oldIDs *roaring.Bitmap, newIDs []uint64) *roaring.Bitmap { + newBitmap := roaring.NewBitmap() + + for _, oldID := range oldIDs.ToArray() { + if newIDs[oldID] != docDropped { + newBitmap.Add(uint32(newIDs[oldID])) + } + } + return newBitmap +} + +// keep in mind with respect to update and delete opeartions with resepct to vectors/ +// leverage bitmaps stored +func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*SegmentBase, drops []*roaring.Bitmap, fieldsInv []string, + newDocNumsIn [][]uint64, w *CountHashWriter, closeCh chan struct{}) error { + vo := v.getvectorIndexOpaque(opaque) + +LOOP: + for fieldID, fieldName := range fieldsInv { + + var indexes []vecIndexMeta + vecToDocID := make(map[int64]*roaring.Bitmap) + + // todo: would parallely fetching the following stuff from segments + // be beneficial in terms of perf? + for segI, sb := range segments { + if isClosed(closeCh) { + return fmt.Errorf("merging of vector sections aborted") + } + if _, ok := sb.fieldsMap[fieldName]; !ok { + continue + } + + // check if the section address is a valid one for "fieldName" in the + // segment sb. the local fieldID (fetched by the fieldsMap of the sb) + // is to be used while consulting the fieldsSectionsMap + pos := int(sb.fieldsSectionsMap[sb.fieldsMap[fieldName]-1][sectionFaissVectorIndex]) + if pos == 0 { + continue LOOP + } + + // loading doc values - adhering to the sections format. never + // valid values for vector section + _, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += n + + _, n = binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += n + + indexSize, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += n + + indexes = append(indexes, vecIndexMeta{ + startOffset: pos, + indexSize: indexSize, + }) + + pos += int(indexSize) + + numVecs, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += n + indexes[len(indexes)-1].vecIds = make([]int64, 0, numVecs) + + for i := 0; i < int(numVecs); i++ { + vecID, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += n + + bitMapLen, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += n + + roaringBytes := sb.mem[pos : pos+int(bitMapLen)] + pos += int(bitMapLen) + + bitMap := roaring.NewBitmap() + _, err := bitMap.FromBuffer(roaringBytes) + if err != nil { + return err + } + + // remap the docIDs from the old segment to the new document nos. + // provided. + bitMap = remapDocIDs(bitMap, newDocNumsIn[segI]) + if vecToDocID[int64(vecID)] == nil { + // if there are some tombstone entries in the docIDs, marked + // in the drops[ith-segment] bitmap, don't include them in the + // final bitmap. + if drops[segI] != nil && !drops[segI].IsEmpty() { + vecToDocID[int64(vecID)] = roaring.AndNot(bitMap, drops[segI]) + } else { + vecToDocID[int64(vecID)] = bitMap + } + } else { + vecToDocID[int64(vecID)].Or(bitMap) + } + + indexes[len(indexes)-1].vecIds = append(indexes[len(indexes)-1].vecIds, int64(vecID)) + } + } + err := vo.mergeAndWriteVectorIndexes(fieldID, segments, vecToDocID, indexes, w, closeCh) + if err != nil { + return err + } + } + + return nil +} + +func (v *vectorIndexOpaque) flushVectorSection(vecToDocID map[int64]*roaring.Bitmap, + serializedIndex []byte, w *CountHashWriter) (int, error) { + tempBuf := v.grabBuf(binary.MaxVarintLen64) + + fieldStart := w.Count() + // marking the fact that for vector index, doc values isn't valid by + // storing fieldNotUniverted values. + n := binary.PutUvarint(tempBuf, uint64(fieldNotUninverted)) + _, err := w.Write(tempBuf[:n]) + if err != nil { + return 0, err + } + n = binary.PutUvarint(tempBuf, uint64(fieldNotUninverted)) + _, err = w.Write(tempBuf[:n]) + if err != nil { + return 0, err + } + + n = binary.PutUvarint(tempBuf, uint64(len(serializedIndex))) + _, err = w.Write(tempBuf[:n]) + if err != nil { + return 0, err + } + + // write the vector index data + _, err = w.Write(serializedIndex) + if err != nil { + return 0, err + } + + // write the number of unique vectors + n = binary.PutUvarint(tempBuf, uint64(len(vecToDocID))) + _, err = w.Write(tempBuf[:n]) + if err != nil { + return 0, err + } + + for vecID, docIDs := range vecToDocID { + // write the vecID + _, err := writeUvarints(w, uint64(vecID)) + if err != nil { + return 0, err + } + + // write the docIDs + _, err = writeRoaringWithLen(docIDs, w, tempBuf) + if err != nil { + return 0, err + } + } + + return fieldStart, nil +} + +// todo: naive implementation. need to keep in mind the perf implications and improve on this. +// perhaps, parallelized merging can help speed things up over here. +func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(fieldID int, sbs []*SegmentBase, + vecToDocID map[int64]*roaring.Bitmap, indexes []vecIndexMeta, w *CountHashWriter, closeCh chan struct{}) error { + + var vecIndexes []*faiss.IndexImpl + for segI, seg := range sbs { + // read the index bytes. todo: parallelize this + indexBytes := seg.mem[indexes[segI].startOffset : indexes[segI].startOffset+int(indexes[segI].indexSize)] + index, err := faiss.ReadIndexFromBuffer(indexBytes, faiss.IOFlagReadOnly) + if err != nil { + return err + } + vecIndexes = append(vecIndexes, index) + } + + // todo: perhaps making this threshold a config value would be better? + if len(vecToDocID) > 10000 { + // merging of more complex index types (for eg ivf family) with reconstruction + // method. + var indexData []float32 + for i := 0; i < len(vecIndexes); i++ { + if isClosed(closeCh) { + return fmt.Errorf("merging of vector sections aborted") + } + // todo: parallelize reconstruction + recons, err := vecIndexes[i].ReconstructBatch(int64(len(indexes[i].vecIds)), indexes[i].vecIds) + if err != nil { + return err + } + indexData = append(indexData, recons...) + } + + // safe to assume that all the indexes are of the same config values, given + // that they are extracted from the field mapping info. + dims := vecIndexes[0].D() + metric := vecIndexes[0].MetricType() + finalVecIDs := maps.Keys(vecToDocID) + + // todo: perhaps the index type can be chosen from a config and tuned accordingly. + index, err := faiss.IndexFactory(dims, "IDMap2,IVF100,SQ8", metric) + if err != nil { + return err + } + + index, err = index.GetIVFSubIndex() + if err != nil { + return err + } + + // the direct map maintained in the IVF index is essential for the + // reconstruction of vectors based on vector IDs in the future merges. + // the AddWithIDs API also needs a direct map to be set before using. + err = index.SetDirectMap(2) + if err != nil { + return err + } + + // train the vector index, essentially performs k-means clustering to partition + // the data space of indexData such that during the search time, we probe + // only a subset of vectors -> non-exhaustive search. could be a time + // consuming step when the indexData is large. + err = index.Train(indexData) + if err != nil { + return err + } + + index.AddWithIDs(indexData, finalVecIDs) + if err != nil { + return err + } + + mergedIndexBytes, err := faiss.WriteIndexIntoBuffer(index) + if err != nil { + return err + } + + fieldStart, err := v.flushVectorSection(vecToDocID, mergedIndexBytes, w) + if err != nil { + return err + } + v.fieldAddrs[uint16(fieldID)] = fieldStart + + return nil + } + + // todo: ivf -> flat index when there were huge number of vector deletes for this field + + for i := 1; i < len(vecIndexes); i++ { + if isClosed(closeCh) { + return fmt.Errorf("merging of vector sections aborted") + } + err := vecIndexes[0].MergeFrom(vecIndexes[i], 0) + if err != nil { + return err + } + } + + mergedIndexBytes, err := faiss.WriteIndexIntoBuffer(vecIndexes[0]) + if err != nil { + return err + } + + fieldStart, err := v.flushVectorSection(vecToDocID, mergedIndexBytes, w) + if err != nil { + return err + } + v.fieldAddrs[uint16(fieldID)] = fieldStart + return nil +} + +// todo: is it possible to merge this resuable stuff with the interim's tmp0? +func (v *vectorIndexOpaque) grabBuf(size int) []byte { + buf := v.tmp0 + if cap(buf) < size { + buf = make([]byte, size) + v.tmp0 = buf + } + return buf[0:size] +} + +func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint64, err error) { + // for every fieldID, contents to store over here are: + // 1. the serialized representation of the dense vector index. + // 2. its constituent vectorID -> {docID} mapping. perhaps a bitmap is enough. + tempBuf := vo.grabBuf(binary.MaxVarintLen64) + for fieldID, content := range vo.vecFieldMap { + + var vecs []float32 + var ids []int64 + + for hash, vecInfo := range content.vecs { + vecs = append(vecs, vecInfo.vec...) + ids = append(ids, int64(hash)) + } + + // create an index, its always a flat for now, because each batch size + // won't have too many vectors (in order for >100K). todo: will need to revisit + // this logic - creating based on configured batch size in scorch. + index, err := faiss.IndexFactory(int(content.dim), "IDMap2,Flat", faiss.MetricL2) + if err != nil { + return 0, err + } + + err = index.Train(vecs) + if err != nil { + return 0, err + } + + index.AddWithIDs(vecs, ids) + if err != nil { + return 0, err + } + + // serialize the built index into a byte slice + buf, err := faiss.WriteIndexIntoBuffer(index) + if err != nil { + return 0, err + } + + fieldStart := w.Count() + // writing out two offset values to indicate that the current field's + // vector section doesn't have valid doc value content within it. + n := binary.PutUvarint(tempBuf, uint64(fieldNotUninverted)) + _, err = w.Write(tempBuf[:n]) + if err != nil { + return 0, err + } + n = binary.PutUvarint(tempBuf, uint64(fieldNotUninverted)) + _, err = w.Write(tempBuf[:n]) + if err != nil { + return 0, err + } + + // record the fieldStart value for this section. + // write the vecID -> docID mapping + // write the index bytes and its length + n = binary.PutUvarint(tempBuf, uint64(len(buf))) + _, err = w.Write(tempBuf[:n]) + if err != nil { + return 0, err + } + + // write the vector index data + _, err = w.Write(buf) + if err != nil { + return 0, err + } + + // write the number of unique vectors + n = binary.PutUvarint(tempBuf, uint64(len(content.vecs))) + _, err = w.Write(tempBuf[:n]) + if err != nil { + return 0, err + } + + // fixme: this can cause a write amplification. need to improve this. + // todo: might need to a reformating to optimize according to mmap needs. + // reformating idea: storing all the IDs mapping towards the end of the + // section would be help avoiding in paging in this data as part of a page + // (which is to load a non-cacheable info like index). this could help the + // paging costs + for vecID, _ := range content.vecs { + docIDs := vo.vecIDMap[vecID].docIDs + // write the vecID + _, err := writeUvarints(w, uint64(vecID)) + if err != nil { + return 0, err + } + + // write the docIDs + _, err = writeRoaringWithLen(docIDs, w, tempBuf) + if err != nil { + return 0, err + } + } + + vo.fieldAddrs[fieldID] = fieldStart + } + return 0, nil +} + +func (vo *vectorIndexOpaque) process(field index.VectorField, fieldID uint16, docNum uint64) { + if !vo.init { + vo.init = true + vo.allocateSpace() + } + if fieldID == math.MaxUint16 { + // doc processing checkpoint. currently nothing to do + return + } + + //process field + + vec := field.Vector() + dim := field.Dims() + metric := field.Similarity() + + if vec != nil { + // NOTE: currently, indexing only unique vectors. + vecHash := hashCode(vec) + if _, ok := vo.vecIDMap[vecHash]; !ok { + vo.vecIDMap[vecHash] = vecInfo{ + docIDs: roaring.NewBitmap(), + } + } + // add the docID to the bitmap + vo.vecIDMap[vecHash].docIDs.Add(uint32(docNum)) + + // tracking the unique vectors for every field which will be used later + // to construct the vector index. + if _, ok := vo.vecFieldMap[fieldID]; !ok { + vo.vecFieldMap[fieldID] = indexContent{ + vecs: map[uint64]vecInfo{ + vecHash: { + vec: vec, + }, + }, + dim: uint16(dim), + metric: metric, + } + } else { + vo.vecFieldMap[fieldID].vecs[vecHash] = vecInfo{ + vec: vec, + } + } + } +} + +// todo: better hash function? +// keep the perf aspects in mind with respect to the hash function. +// random seed based hash golang. +func hashCode(a []float32) uint64 { + var rv uint64 + for _, v := range a { + rv = uint64(math.Float32bits(v)) + (31 * rv) + } + + return rv +} + +func (v *vectorIndexOpaque) allocateSpace() { + // todo: allocate the space for the opaque contents if possible. + // basically to avoid too many heap allocs and also reuse things +} + +func (v *faissVectorIndexSection) getvectorIndexOpaque(opaque map[int]resetable) *vectorIndexOpaque { + if _, ok := opaque[sectionFaissVectorIndex]; !ok { + opaque[sectionFaissVectorIndex] = v.InitOpaque(nil) + } + return opaque[sectionFaissVectorIndex].(*vectorIndexOpaque) +} + +func (v *faissVectorIndexSection) InitOpaque(args map[string]interface{}) resetable { + rv := &vectorIndexOpaque{ + fieldAddrs: make(map[uint16]int), + vecIDMap: make(map[uint64]vecInfo), + vecFieldMap: make(map[uint16]indexContent), + } + for k, v := range args { + rv.Set(k, v) + } + + return rv +} + +type indexContent struct { + vecs map[uint64]vecInfo + dim uint16 + metric string +} + +type vecInfo struct { + vec []float32 + docIDs *roaring.Bitmap +} + +// todo: document the data structures involved in vector section. +type vectorIndexOpaque struct { + init bool + + fieldAddrs map[uint16]int + + vecIDMap map[uint64]vecInfo + vecFieldMap map[uint16]indexContent + + tmp0 []byte +} + +func (vo *vectorIndexOpaque) Reset() (err error) { + // cleanup stuff over here + + return nil +} +func (v *vectorIndexOpaque) Set(key string, val interface{}) { + +} diff --git a/section_inverted_text_index.go b/section_inverted_text_index.go index d0344bea..a196f706 100644 --- a/section_inverted_text_index.go +++ b/section_inverted_text_index.go @@ -26,6 +26,10 @@ import ( "github.com/blevesearch/vellum" ) +func init() { + registerSegmentSection(sectionInvertedTextIndex, &invertedTextIndexSection{}) +} + type invertedTextIndexSection struct { } @@ -278,7 +282,7 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring. } fieldIDPlus1 := uint16(segment.fieldsMap[fieldName]) - if dvIter, exists := segment.fieldDvReaders[sectionInvertedIndex][fieldIDPlus1-1]; exists && + if dvIter, exists := segment.fieldDvReaders[sectionInvertedTextIndex][fieldIDPlus1-1]; exists && dvIter != nil { fdvReadersAvailable = true dvIterClone = dvIter.cloneInto(dvIterClone) @@ -816,10 +820,10 @@ func (i *invertedIndexOpaque) allocateSpace() { } func (i *invertedTextIndexSection) getInvertedIndexOpaque(opaque map[int]resetable) *invertedIndexOpaque { - if _, ok := opaque[sectionInvertedIndex]; !ok { - opaque[sectionInvertedIndex] = i.InitOpaque(nil) + if _, ok := opaque[sectionInvertedTextIndex]; !ok { + opaque[sectionInvertedTextIndex] = i.InitOpaque(nil) } - return opaque[sectionInvertedIndex].(*invertedIndexOpaque) + return opaque[sectionInvertedTextIndex].(*invertedIndexOpaque) } func (i *invertedIndexOpaque) getOrDefineField(fieldName string) int { diff --git a/segment.go b/segment.go index 32d6248e..78512222 100644 --- a/segment.go +++ b/segment.go @@ -366,7 +366,7 @@ func (s *SegmentBase) loadFieldNew(fieldID uint16, addr uint64, fieldSectionAddr := binary.BigEndian.Uint64(s.mem[pos : pos+8]) pos += 8 fieldSectionMap[fieldSectionType] = fieldSectionAddr - if fieldSectionType == sectionInvertedIndex { + if fieldSectionType == sectionInvertedTextIndex { // for the fields which don't have the inverted index, the offset is // 0 and during query time, because there is no valid dictionary we // will just have follow a no-op path. @@ -529,7 +529,6 @@ func (s *SegmentBase) visitStoredFields(vdc *visitDocumentCtx, num uint64, arrayPos[i] = ap } } - value := uncompressed[offset : offset+l] keepGoing = visitor(s.fieldsInv[field], byte(typ), value, arrayPos) } @@ -766,12 +765,12 @@ func (s *Segment) loadDvReadersLegacy() error { if fieldDvReader != nil { // older file formats have docValues corresponding only to inverted index // ignore the rest. - if s.fieldDvReaders[sectionInvertedIndex] == nil { - s.fieldDvReaders[sectionInvertedIndex] = make(map[uint16]*docValueReader) + if s.fieldDvReaders[sectionInvertedTextIndex] == nil { + s.fieldDvReaders[sectionInvertedTextIndex] = make(map[uint16]*docValueReader) } // fix the structure of fieldDvReaders // currently it populates the inverted index doc values - s.fieldDvReaders[sectionInvertedIndex][uint16(fieldID)] = fieldDvReader + s.fieldDvReaders[sectionInvertedTextIndex][uint16(fieldID)] = fieldDvReader s.fieldDvNames = append(s.fieldDvNames, s.fieldsInv[fieldID]) } } @@ -833,7 +832,7 @@ func (s *SegmentBase) loadDvReaders() error { return fmt.Errorf("loadDvReaders: failed to read the dataLoc "+ "offset for sectionID %v field %v", secID, s.fieldsInv[fieldID]) } - if secID == sectionInvertedIndex { + if secID == sectionInvertedTextIndex { s.dictLocs = append(s.dictLocs, dataLoc) } fieldDvReader, err := s.loadFieldDocValueReader(s.fieldsInv[fieldID], fieldLocStart, fieldLocEnd)