Skip to content

Commit

Permalink
Remove subject_transform_dest (#4557)
Browse files Browse the repository at this point in the history
Removes the single subject transform destination field any subject
transformation in StreamSources must now be done using the
SubjectTransforms array instead.
  • Loading branch information
neilalexander authored Sep 20, 2023
2 parents 81c0a14 + 9fc2603 commit 0623e4b
Showing 1 changed file with 35 additions and 80 deletions.
115 changes: 35 additions & 80 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,25 +180,23 @@ type PeerInfo struct {

// StreamSourceInfo shows information about an upstream stream source.
type StreamSourceInfo struct {
Name string `json:"name"`
External *ExternalStream `json:"external,omitempty"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
Error *ApiError `json:"error,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransformDest string `json:"-"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
Name string `json:"name"`
External *ExternalStream `json:"external,omitempty"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
Error *ApiError `json:"error,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
}

// StreamSource dictates how streams can source from other streams.
type StreamSource struct {
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransformDest string `json:"-"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
External *ExternalStream `json:"external,omitempty"`
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
External *ExternalStream `json:"external,omitempty"`

// Internal
iname string // For indexing when stream names are the same for multiple sources.
Expand Down Expand Up @@ -313,8 +311,7 @@ type sourceInfo struct {
qch chan struct{}
sip bool // setup in progress
wg sync.WaitGroup
sf string // subject filter
tr *subjectTransform
sf string // subject filter
sfs []string // subject filters
trs []*subjectTransform // subject transforms
}
Expand Down Expand Up @@ -477,12 +474,6 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
jsa.mu.Unlock()
return nil, fmt.Errorf("subject filter '%s' for the mirror %w", cfg.Mirror.FilterSubject, ErrBadSubject)
}
if cfg.Mirror.SubjectTransformDest != _EMPTY_ {
if _, err = NewSubjectTransform(cfg.Mirror.FilterSubject, cfg.Mirror.SubjectTransformDest); err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject transform from '%s' to '%s' for the mirror %w", cfg.Mirror.FilterSubject, cfg.Mirror.SubjectTransformDest, err)
}
}
} else {
for _, st := range cfg.Mirror.SubjectTransforms {
if st.Source != _EMPTY_ && !IsValidSubject(st.Source) {
Expand All @@ -508,13 +499,6 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
jsa.mu.Unlock()
return nil, fmt.Errorf("subject filter '%s' for the source: %w", ssi.FilterSubject, ErrBadSubject)
}
// check the transform, if any, is valid
if ssi.SubjectTransformDest != _EMPTY_ {
if _, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest); err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject transform from '%s' to '%s' for the source: %w", ssi.FilterSubject, ssi.SubjectTransformDest, err)
}
}
} else {
for _, st := range ssi.SubjectTransforms {
if st.Source != _EMPTY_ && !IsValidSubject(st.Source) {
Expand Down Expand Up @@ -703,7 +687,7 @@ func (ssi *StreamSource) composeIName() string {
}

source := ssi.FilterSubject
destination := ssi.SubjectTransformDest
destination := fwcs

if len(ssi.SubjectTransforms) == 0 {
// normalize filter and destination in case they are empty
Expand Down Expand Up @@ -1271,7 +1255,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
if len(cfg.Sources) > 0 {
return StreamConfig{}, NewJSMirrorWithSourcesError()
}
if (cfg.Mirror.FilterSubject != _EMPTY_ || cfg.Mirror.SubjectTransformDest != _EMPTY_) && len(cfg.Mirror.SubjectTransforms) != 0 {
if cfg.Mirror.FilterSubject != _EMPTY_ && len(cfg.Mirror.SubjectTransforms) != 0 {
return StreamConfig{}, NewJSMirrorMultipleFiltersNotAllowedError()
}
// Check subject filters overlap.
Expand Down Expand Up @@ -1351,7 +1335,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
}
}

if (src.FilterSubject != _EMPTY_ || src.SubjectTransformDest != _EMPTY_) && len(src.SubjectTransforms) != 0 {
if src.FilterSubject != _EMPTY_ && len(src.SubjectTransforms) != 0 {
return StreamConfig{}, NewJSSourceMultipleFiltersNotAllowedError()
}

Expand Down Expand Up @@ -1787,12 +1771,6 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)

if len(s.SubjectTransforms) == 0 {
si = &sourceInfo{name: s.Name, iname: s.iname, sf: s.FilterSubject}
// set for transform if any
var err error
if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil {
mset.mu.Unlock()
return fmt.Errorf("stream source subject transform from '%s' to '%s': %w", s.FilterSubject, s.SubjectTransformDest, err)
}
} else {
si = &sourceInfo{name: s.Name, iname: s.iname}
si.trs = make([]*subjectTransform, len(s.SubjectTransforms))
Expand Down Expand Up @@ -2073,10 +2051,6 @@ func (mset *stream) sourceInfo(si *sourceInfo) *StreamSourceInfo {

var ssi = StreamSourceInfo{Name: si.name, Lag: si.lag, Error: si.err, FilterSubject: si.sf}

if si.tr != nil {
ssi.SubjectTransformDest = si.tr.dest
}

trConfigs := make([]SubjectTransformConfig, len(si.sfs))
for i := range si.sfs {
destination := _EMPTY_
Expand Down Expand Up @@ -2277,18 +2251,15 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
}

// Do the subject transform if there's one
if mset.mirror.tr != nil {
m.subj = mset.mirror.tr.TransformSubject(m.subj)
} else {
for _, tr := range mset.mirror.trs {
if tr == nil {
continue
} else {
tsubj, err := tr.Match(m.subj)
if err == nil {
m.subj = tsubj
break
}

for _, tr := range mset.mirror.trs {
if tr == nil {
continue
} else {
tsubj, err := tr.Match(m.subj)
if err == nil {
m.subj = tsubj
break
}
}
}
Expand Down Expand Up @@ -2506,12 +2477,6 @@ func (mset *stream) setupMirrorConsumer() error {
if mset.cfg.Mirror.FilterSubject != _EMPTY_ {
req.Config.FilterSubject = mset.cfg.Mirror.FilterSubject
mirror.sf = mset.cfg.Mirror.FilterSubject
// Set transform if any
var err error
mirror.tr, err = NewSubjectTransform(mset.cfg.Mirror.FilterSubject, mset.cfg.Mirror.SubjectTransformDest)
if err != nil {
mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err)
}
}

sfs := make([]string, len(mset.cfg.Mirror.SubjectTransforms))
Expand Down Expand Up @@ -3173,18 +3138,15 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
hdr = genHeader(hdr, JSStreamSource, si.genSourceHeader(m.rply))

// Do the subject transform for the source if there's one
if si.tr != nil {
m.subj = si.tr.TransformSubject(m.subj)
} else {
for _, tr := range si.trs {
if tr == nil {
continue
} else {
tsubj, err := tr.Match(m.subj)
if err == nil {
m.subj = tsubj
break
}

for _, tr := range si.trs {
if tr == nil {
continue
} else {
tsubj, err := tr.Match(m.subj)
if err == nil {
m.subj = tsubj
break
}
}
}
Expand Down Expand Up @@ -3345,13 +3307,6 @@ func (mset *stream) startingSequenceForSources() {

if len(ssi.SubjectTransforms) == 0 {
si = &sourceInfo{name: ssi.Name, iname: ssi.iname, sf: ssi.FilterSubject}
// Set the transform if any
// technically no need to check the error as already validated that it will not before
var err error
si.tr, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest)
if err != nil {
mset.srv.Errorf("Unable to get subject transform for source: %v", err)
}
} else {
sfs := make([]string, len(ssi.SubjectTransforms))
trs := make([]*subjectTransform, len(ssi.SubjectTransforms))
Expand Down

0 comments on commit 0623e4b

Please sign in to comment.