-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpubsub.go
62 lines (51 loc) · 1.45 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package main
import (
"fmt"
"strings"
"cloud.google.com/go/pubsub"
log "github.com/Sirupsen/logrus"
"golang.org/x/net/context"
)
func createPubSubClient(projectID string) *pubsub.Client {
client, err := pubsub.NewClient(context.Background(), projectID)
if err != nil {
log.Fatal("Failed to create Pub/Sub client", err)
}
return client
}
func parseProjectAndSubscriptionIDs(source string) (string, string, error) {
items := strings.Split(source, "/")
// Usually subscription URI looks like this:
// projects/<projectName>/subscriptions/<subscriptionName>
if len(items) != 4 {
return "", "", fmt.Errorf("Can not parse pubsub source: %s", source)
}
return items[1], items[3], nil
}
func runPubSubPoller(ctx context.Context, source string,
minfoChan chan *metricInfo) error {
cctx, cancel := context.WithCancel(ctx)
projectID, subID, err := parseProjectAndSubscriptionIDs(source)
if err != nil {
cancel()
return err
}
log.Debugf("Polling a pub/sub subscription %s of project %s ...",
subID, projectID)
sub := createPubSubClient(projectID).Subscription(subID)
err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
select {
case <-ctx.Done():
cancel()
msg.Nack()
return
default:
go func() {
log.Debugf("Got new message: %v", string(msg.Data))
handleIncomingMessage(minfoChan, msg.Data, msg.PublishTime.Unix())
msg.Ack() // XXX: should I ack if an error has happened?
}()
}
})
return err
}