From ad7fb14c586851d7462dffef8252b0f6387849a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wilson=20J=C3=BAnior?= Date: Sat, 22 Sep 2018 21:00:30 -0300 Subject: [PATCH] Add support to MaxStalenessSeconds in ReadPreference --- session.go | 30 ++++++++++++++++++++++++++++++ session_test.go | 14 +++++++++----- socket.go | 8 +++++++- 3 files changed, 46 insertions(+), 6 deletions(-) diff --git a/session.go b/session.go index 4a77432c0..9ea6f884e 100644 --- a/session.go +++ b/session.go @@ -353,6 +353,7 @@ func ParseURL(url string) (*DialInfo, error) { var readPreferenceTagSets []bson.D minPoolSize := 0 maxIdleTimeMS := 0 + maxStalenessSeconds := 0 safe := Safe{} for _, opt := range uinfo.options { switch opt.key { @@ -390,6 +391,11 @@ func ParseURL(url string) (*DialInfo, error) { if err != nil { return nil, errors.New("bad value for maxPoolSize: " + opt.value) } + case "maxStalenessSeconds": + maxStalenessSeconds, err = strconv.Atoi(opt.value) + if err != nil { + return nil, errors.New("bad value for maxStalenessSeconds: " + opt.value) + } case "appName": if len(opt.value) > 128 { return nil, errors.New("appName too long, must be < 128 bytes: " + opt.value) @@ -469,6 +475,8 @@ func ParseURL(url string) (*DialInfo, error) { ReadPreference: &ReadPreference{ Mode: readPreferenceMode, TagSets: readPreferenceTagSets, + + MaxStalenessSeconds: maxStalenessSeconds, }, Safe: safe, ReplicaSetName: setName, @@ -607,6 +615,8 @@ func (i *DialInfo) Copy() *DialInfo { if i.ReadPreference != nil { readPreference = &ReadPreference{ Mode: i.ReadPreference.Mode, + + MaxStalenessSeconds: i.ReadPreference.MaxStalenessSeconds, } readPreference.TagSets = make([]bson.D, len(i.ReadPreference.TagSets)) copy(readPreference.TagSets, i.ReadPreference.TagSets) @@ -679,6 +689,9 @@ type ReadPreference struct { // Mode determines the consistency of results. See Session.SetMode. Mode Mode + // MaxStalenessSeconds specify a maximum replication lag, or “staleness” in seconds, for reads from secondaries. + MaxStalenessSeconds int + // TagSets indicates which servers are allowed to be used. See Session.SelectServers. TagSets []bson.D } @@ -768,6 +781,7 @@ func DialWithInfo(dialInfo *DialInfo) (*Session, error) { if info.ReadPreference != nil { session.SelectServers(info.ReadPreference.TagSets...) session.SetMode(info.ReadPreference.Mode, true) + session.SetMaxStalenessSeconds(info.ReadPreference.MaxStalenessSeconds) } else { session.SetMode(Strong, true) } @@ -2190,6 +2204,22 @@ func (s *Session) SetPoolTimeout(timeout time.Duration) { s.m.Unlock() } +// SetMaxStalenessSeconds set the maximum of seconds of replication lag from secondaries +// +// Relevant documentation: +// +// https://docs.mongodb.com/manual/core/read-preference/#maxstalenessseconds +// +func (s *Session) SetMaxStalenessSeconds(seconds int) { + s.m.Lock() + if seconds > -1 { + s.queryConfig.op.maxStalenessSeconds = seconds + } else { + panic("SetMaxStalenessSeconds: support only positive numbers") + } + s.m.Unlock() +} + // SetBypassValidation sets whether the server should bypass the registered // validation expressions executed when documents are inserted or modified, // in the interest of preserving invariants in the collection being modified. diff --git a/session_test.go b/session_test.go index eaa8964f3..358274329 100644 --- a/session_test.go +++ b/session_test.go @@ -167,14 +167,17 @@ func (s *S) TestURLReadPreference(c *C) { type test struct { url string mode mgo.Mode + + maxStalenessSeconds int } tests := []test{ - {"localhost:40001?readPreference=primary", mgo.Primary}, - {"localhost:40001?readPreference=primaryPreferred", mgo.PrimaryPreferred}, - {"localhost:40001?readPreference=secondary", mgo.Secondary}, - {"localhost:40001?readPreference=secondaryPreferred", mgo.SecondaryPreferred}, - {"localhost:40001?readPreference=nearest", mgo.Nearest}, + {"localhost:40001?readPreference=primary&maxStalenessSeconds=10", mgo.Primary, 10}, + {"localhost:40001?readPreference=primary", mgo.Primary, 0}, + {"localhost:40001?readPreference=primaryPreferred", mgo.PrimaryPreferred, 0}, + {"localhost:40001?readPreference=secondary", mgo.Secondary, 0}, + {"localhost:40001?readPreference=secondaryPreferred", mgo.SecondaryPreferred, 0}, + {"localhost:40001?readPreference=nearest", mgo.Nearest, 0}, } for _, test := range tests { @@ -182,6 +185,7 @@ func (s *S) TestURLReadPreference(c *C) { c.Assert(err, IsNil) c.Assert(info.ReadPreference, NotNil) c.Assert(info.ReadPreference.Mode, Equals, test.mode) + c.Assert(info.ReadPreference.MaxStalenessSeconds, Equals, test.maxStalenessSeconds) } } diff --git a/socket.go b/socket.go index bbe65dc16..877b87828 100644 --- a/socket.go +++ b/socket.go @@ -83,6 +83,8 @@ type queryOp struct { hasOptions bool flags queryOpFlags readConcern string + + maxStalenessSeconds int } type queryWrapper struct { @@ -120,11 +122,15 @@ func (op *queryOp) finalQuery(socket *mongoSocket) interface{} { panic(fmt.Sprintf("unsupported read mode: %d", op.mode)) } op.hasOptions = true - op.options.ReadPreference = make(bson.D, 0, 2) + op.options.ReadPreference = make(bson.D, 0, 3) op.options.ReadPreference = append(op.options.ReadPreference, bson.DocElem{Name: "mode", Value: modeName}) if len(op.serverTags) > 0 { op.options.ReadPreference = append(op.options.ReadPreference, bson.DocElem{Name: "tags", Value: op.serverTags}) } + + if op.maxStalenessSeconds > 0 { + op.options.ReadPreference = append(op.options.ReadPreference, bson.DocElem{Name: "maxStalenessSeconds", Value: op.maxStalenessSeconds}) + } } if op.hasOptions { if op.query == nil {