-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathsubscriber.go
110 lines (95 loc) · 2.53 KB
/
subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package reactor
import (
"context"
"math"
)
// RequestInfinite means request items indefinitely.
const RequestInfinite = math.MaxInt32
// Subscription represents a one-to-one lifecycle of a Subscriber subscribing to a Publisher.
type Subscription interface {
// Request requests the next N items.
Request(n int)
// Cancel cancels the current lifecycle of subscribing.
Cancel()
}
// Subscriber is the basic type to subscribing the Publisher and consumes the items from upstream.
type Subscriber interface {
// OnComplete is successful terminal state.
OnComplete()
// OnError is failed terminal state.
OnError(error)
// OnNext is invoked when a data notification sent by the Publisher in response to requests to Subscription.Request(int).
OnNext(Any)
// OnSubscribe is invoked after calling RawPublisher.SubscribeWith(context.Context, Subscriber).
OnSubscribe(context.Context, Subscription)
}
type subscriber struct {
onSubscribe FnOnSubscribe
onNext FnOnNext
onComplete FnOnComplete
onError FnOnError
}
func (p *subscriber) OnComplete() {
if p == nil || p.onComplete == nil {
return
}
p.onComplete()
}
func (p *subscriber) OnError(err error) {
if p == nil || p.onError == nil {
return
}
p.onError(err)
}
func (p *subscriber) OnSubscribe(ctx context.Context, s Subscription) {
if p == nil || p.onSubscribe == nil {
s.Request(RequestInfinite)
} else {
p.onSubscribe(ctx, s)
}
}
func (p *subscriber) OnNext(i Any) {
if p == nil || p.onNext == nil {
return
}
if err := p.onNext(i); err != nil {
p.OnError(err)
}
}
// SubscriberOption is used to create a Subscriber easily.
type SubscriberOption func(*subscriber)
// OnNext specified a Subscriber.OnNext action.
func OnNext(onNext FnOnNext) SubscriberOption {
return func(s *subscriber) {
s.onNext = onNext
}
}
// OnComplete specified a Subscriber.OnComplete action.
func OnComplete(onComplete FnOnComplete) SubscriberOption {
return func(s *subscriber) {
s.onComplete = onComplete
}
}
// OnError specified a Subscriber.OnError action.
func OnError(onError FnOnError) SubscriberOption {
return func(i *subscriber) {
i.onError = onError
}
}
// OnSubscribe specified a Subscriber.OnSubscribe action.
func OnSubscribe(onSubscribe FnOnSubscribe) SubscriberOption {
return func(i *subscriber) {
i.onSubscribe = onSubscribe
}
}
// NewSubscriber creates a Subscriber with given options.
func NewSubscriber(opts ...SubscriberOption) Subscriber {
if len(opts) < 1 {
return (*subscriber)(nil)
}
s := &subscriber{}
for _, opt := range opts {
opt(s)
}
return s
}