From 67ff921557098bf6fa30cdef9ef05c73ae8aaf3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wilson=20J=C3=BAnior?= Date: Sat, 22 Sep 2018 21:00:30 -0300 Subject: [PATCH] Add support to MaxStalenessSeconds in ReadPreference --- session.go | 46 ++++++++++++++++++++++++++++++++++++++++++++++ session_test.go | 15 ++++++++++----- socket.go | 8 +++++++- 3 files changed, 63 insertions(+), 6 deletions(-) diff --git a/session.go b/session.go index 4a77432c0..d16dc5131 100644 --- a/session.go +++ b/session.go @@ -304,6 +304,11 @@ const ( // false: Initiate the connection without TLS/SSL. // The default value is false. // +// maxStalenessSeconds= +// +// specify a maximum replication lag, or “staleness” in seconds, for reads from secondaries, minimum value allowed is 90. +// Works on MongoDB 3.4+ +// // Relevant documentation: // // http://docs.mongodb.org/manual/reference/connection-string/ @@ -353,6 +358,7 @@ func ParseURL(url string) (*DialInfo, error) { var readPreferenceTagSets []bson.D minPoolSize := 0 maxIdleTimeMS := 0 + maxStalenessSeconds := 0 safe := Safe{} for _, opt := range uinfo.options { switch opt.key { @@ -390,6 +396,17 @@ func ParseURL(url string) (*DialInfo, error) { if err != nil { return nil, errors.New("bad value for maxPoolSize: " + opt.value) } + case "maxStalenessSeconds": + maxStalenessSeconds, err = strconv.Atoi(opt.value) + + if err != nil { + return nil, errors.New("bad value for maxStalenessSeconds: " + opt.value) + } + + if maxStalenessSeconds > 0 && maxStalenessSeconds < 90 { + return nil, errors.New("maxStalenessSeconds too low " + opt.value + ", must be >= 90 seconds") + } + case "appName": if len(opt.value) > 128 { return nil, errors.New("appName too long, must be < 128 bytes: " + opt.value) @@ -455,6 +472,10 @@ func ParseURL(url string) (*DialInfo, error) { return nil, errors.New("readPreferenceTagSet may not be specified when readPreference is primary") } + if readPreferenceMode == Primary && maxStalenessSeconds > 0 { + return nil, errors.New("maxStalenessSeconds may not be specified when readPreference is primary") + } + info := DialInfo{ Addrs: uinfo.addrs, Direct: direct, @@ -469,6 +490,8 @@ func ParseURL(url string) (*DialInfo, error) { ReadPreference: &ReadPreference{ Mode: readPreferenceMode, TagSets: readPreferenceTagSets, + + MaxStalenessSeconds: maxStalenessSeconds, }, Safe: safe, ReplicaSetName: setName, @@ -607,6 +630,8 @@ func (i *DialInfo) Copy() *DialInfo { if i.ReadPreference != nil { readPreference = &ReadPreference{ Mode: i.ReadPreference.Mode, + + MaxStalenessSeconds: i.ReadPreference.MaxStalenessSeconds, } readPreference.TagSets = make([]bson.D, len(i.ReadPreference.TagSets)) copy(readPreference.TagSets, i.ReadPreference.TagSets) @@ -679,6 +704,9 @@ type ReadPreference struct { // Mode determines the consistency of results. See Session.SetMode. Mode Mode + // MaxStalenessSeconds specify a maximum replication lag, or “staleness” in seconds, for reads from secondaries. + MaxStalenessSeconds int + // TagSets indicates which servers are allowed to be used. See Session.SelectServers. TagSets []bson.D } @@ -768,6 +796,7 @@ func DialWithInfo(dialInfo *DialInfo) (*Session, error) { if info.ReadPreference != nil { session.SelectServers(info.ReadPreference.TagSets...) session.SetMode(info.ReadPreference.Mode, true) + session.SetMaxStalenessSeconds(info.ReadPreference.MaxStalenessSeconds) } else { session.SetMode(Strong, true) } @@ -2190,6 +2219,23 @@ func (s *Session) SetPoolTimeout(timeout time.Duration) { s.m.Unlock() } +// SetMaxStalenessSeconds set the maximum of seconds of replication lag from secondaries +// Works on MongoDB 3.4+ +// +// Relevant documentation: +// +// https://docs.mongodb.com/manual/core/read-preference/#maxstalenessseconds +// +func (s *Session) SetMaxStalenessSeconds(seconds int) error { + s.m.Lock() + defer s.m.Unlock() + if seconds > 0 && seconds < 90 { + return errors.New("SetMaxStalenessSeconds: minimum of seconds is 90") + } + s.queryConfig.op.maxStalenessSeconds = seconds + return nil +} + // SetBypassValidation sets whether the server should bypass the registered // validation expressions executed when documents are inserted or modified, // in the interest of preserving invariants in the collection being modified. diff --git a/session_test.go b/session_test.go index eaa8964f3..79d8b2f2a 100644 --- a/session_test.go +++ b/session_test.go @@ -167,14 +167,17 @@ func (s *S) TestURLReadPreference(c *C) { type test struct { url string mode mgo.Mode + + maxStalenessSeconds int } tests := []test{ - {"localhost:40001?readPreference=primary", mgo.Primary}, - {"localhost:40001?readPreference=primaryPreferred", mgo.PrimaryPreferred}, - {"localhost:40001?readPreference=secondary", mgo.Secondary}, - {"localhost:40001?readPreference=secondaryPreferred", mgo.SecondaryPreferred}, - {"localhost:40001?readPreference=nearest", mgo.Nearest}, + {"localhost:40001?readPreference=primary", mgo.Primary, 0}, + {"localhost:40001?readPreference=primaryPreferred", mgo.PrimaryPreferred, 0}, + {"localhost:40001?readPreference=secondary", mgo.Secondary, 0}, + {"localhost:40001?readPreference=secondaryPreferred", mgo.SecondaryPreferred, 0}, + {"localhost:40001?readPreference=secondary&maxStalenessSeconds=10", mgo.Secondary, 10}, + {"localhost:40001?readPreference=nearest", mgo.Nearest, 0}, } for _, test := range tests { @@ -182,6 +185,7 @@ func (s *S) TestURLReadPreference(c *C) { c.Assert(err, IsNil) c.Assert(info.ReadPreference, NotNil) c.Assert(info.ReadPreference.Mode, Equals, test.mode) + c.Assert(info.ReadPreference.MaxStalenessSeconds, Equals, test.maxStalenessSeconds) } } @@ -189,6 +193,7 @@ func (s *S) TestURLInvalidReadPreference(c *C) { urls := []string{ "localhost:40001?readPreference=foo", "localhost:40001?readPreference=primarypreferred", + "localhost:40001?readPreference=primary&maxStalenessSeconds=90", } for _, url := range urls { _, err := mgo.ParseURL(url) diff --git a/socket.go b/socket.go index bbe65dc16..877b87828 100644 --- a/socket.go +++ b/socket.go @@ -83,6 +83,8 @@ type queryOp struct { hasOptions bool flags queryOpFlags readConcern string + + maxStalenessSeconds int } type queryWrapper struct { @@ -120,11 +122,15 @@ func (op *queryOp) finalQuery(socket *mongoSocket) interface{} { panic(fmt.Sprintf("unsupported read mode: %d", op.mode)) } op.hasOptions = true - op.options.ReadPreference = make(bson.D, 0, 2) + op.options.ReadPreference = make(bson.D, 0, 3) op.options.ReadPreference = append(op.options.ReadPreference, bson.DocElem{Name: "mode", Value: modeName}) if len(op.serverTags) > 0 { op.options.ReadPreference = append(op.options.ReadPreference, bson.DocElem{Name: "tags", Value: op.serverTags}) } + + if op.maxStalenessSeconds > 0 { + op.options.ReadPreference = append(op.options.ReadPreference, bson.DocElem{Name: "maxStalenessSeconds", Value: op.maxStalenessSeconds}) + } } if op.hasOptions { if op.query == nil {