From d9b4b6a69cecd382ba116a7be5abde00e022590a Mon Sep 17 00:00:00 2001 From: Ming Date: Fri, 28 Aug 2020 13:45:53 -0400 Subject: [PATCH] Batch messages poll endpoint (#65) * support message poll rest endpoint * replace mongo install action in the github actions * returns 204 no-content if the message list is empty --- .github/workflows/go.yml | 34 +++++++++++----------- README.md | 18 +++++++++++- config/pulsar_beam.yml | 3 +- go.sum | 3 ++ src/broker/sse-broker.go | 42 ++++++++++++++++++++++----- src/model/message.go | 57 ++++++++++++++++++++++++++++++++++++ src/route/handlers.go | 58 ++++++++++++++++++++++++++++++------- src/route/routes.go | 7 +++++ src/unit-test/model_test.go | 14 +++++++++ src/unit-test/util_test.go | 46 +++++++++++++++++++++++++++++ src/util/util.go | 34 +++++++++++++++++++++- 11 files changed, 279 insertions(+), 37 deletions(-) create mode 100644 src/model/message.go create mode 100644 src/unit-test/model_test.go diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index e7a28c7..f32f59b 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -40,6 +40,9 @@ jobs: build_test: name: build and test runs-on: ubuntu-latest + strategy: + matrix: + mongodb-version: [4.2] steps: - name: Set up Go uses: actions/setup-go@v1 @@ -51,16 +54,13 @@ jobs: with: fetch-depth: 1 path: go/src/github.com/kafkaesque-io/pulsar-beam - - name: Install MongoDB - run: | - echo $GITHUB_EVENT_NAME - echo $GITHUB_EVENT_PATH - sudo apt-get update - sudo apt-get install -y mongodb + - name: Start MongoDB v${{ matrix.mongodb-version }} + uses: supercharge/mongodb-github-action@1.2.0 + with: + mongodb-version: ${{ matrix.mongodb-version }} - name: Verify MongoDB Installation and Status run: | - ls /var/lib/mongodb - sudo systemctl status mongodb + sudo docker ps - name: Build Binary run: | go mod download @@ -111,7 +111,10 @@ jobs: e2e_test: name: e2e_test needs: [analysis, build_test] - runs-on: ubuntu-latest + runs-on: ubuntu-latest + strategy: + matrix: + mongodb-version: [4.2] steps: - name: Check out code uses: actions/checkout@v1 @@ -123,16 +126,13 @@ jobs: run: | pwd go mod download - - name: Install MongoDB - run: | - echo $GITHUB_EVENT_NAME - echo $GITHUB_EVENT_PATH - sudo apt-get update - sudo apt-get install -y mongodb + - name: Start MongoDB v${{ matrix.mongodb-version }} + uses: supercharge/mongodb-github-action@1.2.0 + with: + mongodb-version: ${{ matrix.mongodb-version }} - name: Verify MongoDB Installation and Status run: | - ls /var/lib/mongodb - sudo systemctl status mongodb + sudo docker ps - name: Set up root certificate env: PULSAR_CLIENT_CERT: ${{ secrets.PULSAR_CLIENT_CERT }} diff --git a/README.md b/README.md index 532c1d7..b69f696 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ Beam is an http based streaming and queueing system backed up by Apache Pulsar. - [x] A message can be pushed to a webhook or Cloud Function for consumption. - [x] A webhook or Cloud Function receives a message, process it and reply another message, in a response body, back to another Pulsar topic via Pulsar Beam. - [x] Messages can be streamed via HTTP Sever Sent Event, [SSE](https://www.html5rocks.com/en/tutorials/eventsource/basics/) +- [x] Support HTTP polling of batch messages Opening an issue and PR are welcomed! Please email `contact@kafkaesque.io` for any inquiry or demo. @@ -59,6 +60,21 @@ Query parameters 2. SubscriptionInitialPosition -> supported type are `latest` as default and `earliest` 3. SubscriptionName -> the length must be 5 characters or longer. An auto-generated name will be provided in absence. Only the auto-generated subscription will be unsubscribed. +### Endpoint to poll batch messages +Polls a batch of messages always from the earliest subscription position from a topic. +``` +/v2/poll/{persistent}/{tenant}/{namespace}/{topic} +``` +These HTTP headers may be required to map to Pulsar topic. +1. Authorization -> Bearer token as Pulsar token +2. PulsarUrl -> *optional* a fully qualified pulsar or pulsar+ssl URL where the message should be sent to. It is optional. The message will be sent to Pulsar URL specified under `PulsarBrokerURL` in the pulsar-beam.yml file if it is absent. + +Query parameters +1. SubscriptionType -> Supported type strings are `exclusive` as default, `shared`, and `failover` +2. SubscriptionName -> the length must be 5 characters or longer. An auto-generated name will be provided in absence. Only the auto-generated subscription will be unsubscribed. +3. size -> The batch size. The default is 10. +4. perMessageTimeoutMs -> is a wait time out for the next message to arrive. It is in milliseconds per message. The default is 300ms. + ### Webhook registration Webhook registration is done via REST API backed by a database of your choice, such as MongoDB, in momery cache, and Pulsar itself. Yes, you can use a compacted Pulsar topic as a database table to perform CRUD. The configuration parameter is `"PbDbType": "inmemory",` in the `pulsar_beam.yml` file or the env variable `PbDbType`. @@ -149,7 +165,7 @@ One end to end test is under `./src/e2e/e2etest.go`, that performs the following 4. Verify the replied message on the sink topic 5. Delete the topic and its webhook document via RESTful API -Since the set up is non-trivial involving Pulsar Beam, a Cloud function or webhook, the test tool, and Pulsar itself with SSL, we recommend to take advantage of [the free plan at Kafkaesque.io](https://kafkaesque.io) as the Pulsar server and a Cloud Function that we have verified GCP Fcuntion, Azure Function or AWS Lambda will suffice in the e2e flow. +Since the set up is non-trivial involving Pulsar Beam, a Cloud function or webhook, the test tool, and Pulsar itself with SSL, we recommend to take advantage of [the free plan at kesque.com](https://kesque.com) as the Pulsar server and a Cloud Function that we have verified GCP Fcuntion, Azure Function or AWS Lambda will suffice in the e2e flow. Step to perform unit test ```bash diff --git a/config/pulsar_beam.yml b/config/pulsar_beam.yml index d45c2a3..35a28f5 100644 --- a/config/pulsar_beam.yml +++ b/config/pulsar_beam.yml @@ -7,4 +7,5 @@ PulsarPrivateKey: ./unit-test/example_private_key PbDbInterval: 10s DbConnectionStr: mongodb://localhost:27017 DbName: -DbPassword: \ No newline at end of file +DbPassword: +TrustStore: "/etc/ssl/certs/ca-bundle.crt" \ No newline at end of file diff --git a/go.sum b/go.sum index 3211416..b833bda 100644 --- a/go.sum +++ b/go.sum @@ -19,6 +19,7 @@ github.com/boynton/repl v0.0.0-20170116235056-348863958e3e/go.mod h1:Crc/GCZ3NXD github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= @@ -79,6 +80,7 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= @@ -113,6 +115,7 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/tidwall/pretty v1.0.1/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= diff --git a/src/broker/sse-broker.go b/src/broker/sse-broker.go index 544c232..b4d242f 100644 --- a/src/broker/sse-broker.go +++ b/src/broker/sse-broker.go @@ -1,15 +1,13 @@ package broker import ( + "strings" + "time" + "github.com/apache/pulsar-client-go/pulsar" + "github.com/kafkaesque-io/pulsar-beam/src/model" "github.com/kafkaesque-io/pulsar-beam/src/pulsardriver" -) - -const ( - // SSEBrokerMaxSize the max size of the number of HTTP SSE session is supported - SSEBrokerMaxSize = 200 - - // TODO add counters and max limit for SSEBroker + log "github.com/sirupsen/logrus" ) // GetPulsarClientConsumer returns Puslar client and consumer interface objects @@ -31,3 +29,33 @@ func GetPulsarClientConsumer(url, token, topic, subscriptionName string, subType return client, consumer, nil } + +// PollBatchMessages polls a batch of consumer messages +func PollBatchMessages(url, token, topic, subscriptionName string, subType pulsar.SubscriptionType, size, perMessageTimeoutMs int) (model.PulsarMessages, error) { + log.Infof("getbatchmessages called") + client, consumer, err := GetPulsarClientConsumer(url, token, topic, subscriptionName, subType, pulsar.SubscriptionPositionEarliest) + if err != nil { + return model.NewPulsarMessages(size), err + } + if strings.HasPrefix(subscriptionName, model.NonResumable) { + defer consumer.Unsubscribe() + } + defer consumer.Close() + defer client.Close() + + messages := model.NewPulsarMessages(size) + consumChan := consumer.Chan() + for i := 0; i < size; i++ { + select { + case msg := <-consumChan: + // log.Infof("received message %s on topic %s", string(msg.Payload()), msg.Topic()) + messages.AddPulsarMessage(msg) + consumer.Ack(msg) + + case <-time.After(time.Duration(perMessageTimeoutMs) * time.Millisecond): //TODO: this should be configurable + i = size + } + } + + return messages, nil +} diff --git a/src/model/message.go b/src/model/message.go new file mode 100644 index 0000000..17dd090 --- /dev/null +++ b/src/model/message.go @@ -0,0 +1,57 @@ +package model + +import ( + "fmt" + "time" + + "github.com/apache/pulsar-client-go/pulsar" +) + +// PulsarMessage is the Pulsar Message type +type PulsarMessage struct { + Payload []byte `json:"payload"` + Topic string `json:"topic"` + EventTime time.Time `json:"eventTime"` + PublishTime time.Time `json:"publishTime"` + MessageID string `json:"messageId"` + Key string `json:"key"` +} + +// PulsarMessages encapsulates a list of messages to be returned to a client +type PulsarMessages struct { + Limit int `json:"limit"` + Size int `json:"size"` + Messages []PulsarMessage `json:"messages"` +} + +// NewPulsarMessages create a PulsarMessages object +func NewPulsarMessages(initSize int) PulsarMessages { + return PulsarMessages{ + Limit: initSize, + Size: 0, + Messages: make([]PulsarMessage, 0), + } +} + +// AddPulsarMessage adds a Pulsar Message to the payload, return true if reaches capacity +func (msgs *PulsarMessages) AddPulsarMessage(msg pulsar.Message) bool { + if msgs.Size >= msgs.Limit { + return true + } + msgs.Messages = append(msgs.Messages, PulsarMessage{ + Payload: msg.Payload(), + Topic: msg.Topic(), + EventTime: msg.EventTime(), + PublishTime: msg.PublishTime(), + MessageID: fmt.Sprintf("%+v", msg.ID()), + Key: msg.Key(), + }) + msgs.Size++ + + return msgs.Size >= msgs.Limit +} + +// IsEmpty checks if the message list is empty +func (msgs *PulsarMessages) IsEmpty() bool { + return msgs.Size == 0 +} diff --git a/src/route/handlers.go b/src/route/handlers.go index 5e30cf6..97b441e 100644 --- a/src/route/handlers.go +++ b/src/route/handlers.go @@ -59,7 +59,6 @@ func TokenSubjectHandler(w http.ResponseWriter, r *http.Request) { return } w.Write(respJSON) - w.WriteHeader(http.StatusOK) } return } @@ -106,15 +105,55 @@ func ReceiveHandler(w http.ResponseWriter, r *http.Request) { return } +// recoverHandler a function recovers from panic +func recoverHandler(r *http.Request) { + if r := recover(); r != nil { + fmt.Printf("Recovered in http handler crash %v", r) + } else { + fmt.Printf("exit http handler") + } +} + +// PollHandler polls messages from a Pulsar topic. +func PollHandler(w http.ResponseWriter, r *http.Request) { + defer recoverHandler(r) + + u, _ := url.Parse(r.URL.String()) + params := u.Query() + token, topicFN, pulsarURL, subName, _, subType, err := ConsumerConfigFromHTTPParts(util.AllowedPulsarURLs, &r.Header, mux.Vars(r), params) + if err != nil { + util.ResponseErrorJSON(err, w, http.StatusUnprocessableEntity) + return + } + w.Header().Set("Content-Type", "application/json; charset=UTF-8") + + size := util.QueryParamInt(params, "batch", 10) + perMessageTimeoutMs := util.QueryParamInt(params, "perMessageTimeoutMs", 300) + + // subscription initial position is always set to earliest since this is short poll + msgs, err := broker.PollBatchMessages(pulsarURL, token, topicFN, subName, subType, size, perMessageTimeoutMs) + if err != nil { + util.ResponseErrorJSON(err, w, http.StatusInternalServerError) + return + } + + if msgs.IsEmpty() { + w.WriteHeader(http.StatusNoContent) + return + } + + data, err := json.Marshal(msgs) + if err != nil { + util.ResponseErrorJSON(err, w, http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + w.Write(data) +} + // SSEHandler is the HTTP SSE handler func SSEHandler(w http.ResponseWriter, r *http.Request) { - defer func() { - if r := recover(); r != nil { - fmt.Printf("Recovered in SSEHandler %v", r) - } else { - fmt.Printf("exit SSEHandler()") - } - }() + defer recoverHandler(r) u, _ := url.Parse(r.URL.String()) params := u.Query() @@ -189,7 +228,6 @@ func GetTopicHandler(w http.ResponseWriter, r *http.Request) { util.ResponseErrorJSON(err, w, http.StatusInternalServerError) } else { w.Header().Set("Content-Type", "application/json; charset=UTF-8") - w.WriteHeader(http.StatusOK) w.Write(resJSON) } @@ -199,6 +237,7 @@ func GetTopicHandler(w http.ResponseWriter, r *http.Request) { func UpdateTopicHandler(w http.ResponseWriter, r *http.Request) { decoder := json.NewDecoder(r.Body) defer r.Body.Close() + w.Header().Set("Content-Type", "application/json; charset=UTF-8") var doc model.TopicConfig err := decoder.Decode(&doc) @@ -270,7 +309,6 @@ func DeleteTopicHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) } else { w.Header().Set("Content-Type", "application/json; charset=UTF-8") - w.WriteHeader(http.StatusOK) w.Write(resJSON) } } diff --git a/src/route/routes.go b/src/route/routes.go index 6792abc..b09cc1c 100644 --- a/src/route/routes.go +++ b/src/route/routes.go @@ -72,6 +72,13 @@ var ReceiverRoutes = Routes{ SSEHandler, middleware.AuthVerifyJWT, }, + Route{ + "poll-messages", + http.MethodGet, + "/v2/poll/{persistent}/{tenant}/{namespace}/{topic}", + PollHandler, + middleware.AuthVerifyJWT, + }, } // RestRoutes definition diff --git a/src/unit-test/model_test.go b/src/unit-test/model_test.go new file mode 100644 index 0000000..1b038ee --- /dev/null +++ b/src/unit-test/model_test.go @@ -0,0 +1,14 @@ +package tests + +import ( + "testing" + + . "github.com/kafkaesque-io/pulsar-beam/src/model" +) + +func TestPulsarMessages(t *testing.T) { + + messages := NewPulsarMessages(10) + equals(t, messages.Limit, 10) + equals(t, messages.IsEmpty(), true) +} diff --git a/src/unit-test/util_test.go b/src/unit-test/util_test.go index 541a7b8..258ee2b 100644 --- a/src/unit-test/util_test.go +++ b/src/unit-test/util_test.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "net/http" + "net/url" "os" "strconv" "strings" @@ -392,5 +393,50 @@ func TestStringToBoo(t *testing.T) { assert(t, !StringToBool("notok"), "string notok yields boolean false") assert(t, !StringToBool("disable"), "string disable yields boolean false") assert(t, !StringToBool("adsfasdf"), "string any string yields boolean false") +} + +func TestTokenizeTopicFullName(t *testing.T) { + isPersistent, tenant, ns, topic, err := TokenizeTopicFullName("persistent://public/default/test-topic") + errNil(t, err) + equals(t, isPersistent, true) + equals(t, tenant, "public") + equals(t, ns, "default") + equals(t, topic, "test-topic") + + isPersistent, tenant, ns, topic, err = TokenizeTopicFullName("non-persistent://public/default/test-topic") + errNil(t, err) + equals(t, isPersistent, false) + equals(t, tenant, "public") + equals(t, ns, "default") + equals(t, topic, "test-topic") + + _, _, _, _, err = TokenizeTopicFullName("persitent://public/default/test-topic") + assertErr(t, "invalid persistent or non-persistent part", err) + + isPersistent, tenant, ns, topic, err = TokenizeTopicFullName("non-persistent://public/default") + errNil(t, err) + equals(t, isPersistent, false) + equals(t, tenant, "public") + equals(t, ns, "default") + equals(t, topic, "") + + _, _, _, _, err = TokenizeTopicFullName("non-persistent://public") + assertErr(t, "missing tenant, namespace, or topic name", err) + + _, _, _, _, err = TokenizeTopicFullName("non-persistent://public/default/to2/to3") + assertErr(t, "missing tenant, namespace, or topic name", err) +} +func TestHTTPParams(t *testing.T) { + params := url.Values{} + params.Set("var1", "testme") + params.Set("var2", "48") + params.Set("var3", "7") + equals(t, QueryParamInt(params, "var1", 5), 5) + equals(t, QueryParamInt(params, "var2", 5), 48) + equals(t, QueryParamInt(params, "var22", 5), 5) + + equals(t, QueryParamString(params, "var1", "48"), "testme") + equals(t, QueryParamString(params, "var2", "test"), "48") + equals(t, QueryParamString(params, "var22", "another"), "another") } diff --git a/src/util/util.go b/src/util/util.go index 94172eb..1093f19 100644 --- a/src/util/util.go +++ b/src/util/util.go @@ -71,7 +71,6 @@ func ReceiverHeader(allowedClusters []string, h *http.Header) (token, topicFN, p } else if pulsarURL == "" { return "", "", "", fmt.Errorf("missing configured Pulsar URL") } - fmt.Printf("pulsarURL is %s\n", pulsarURL) return token, topicFN, pulsarURL, nil } @@ -142,3 +141,36 @@ func QueryParamString(params url.Values, name, defaultValue string) string { } return defaultValue } + +// QueryParamInt returns a URL query parameter's value with a default value +func QueryParamInt(params url.Values, name string, defaultValue int) int { + if str, ok := params[name]; ok { + if num, err := strconv.Atoi(str[0]); err == nil { + return num + } + } + return defaultValue +} + +// TokenizeTopicFullName tokenizes a topic full name into persistent, tenant, namespace, and topic name. +func TokenizeTopicFullName(topicFn string) (isPersistent bool, tenant, namespace, topic string, err error) { + var topicRoute string + if strings.HasPrefix(topicFn, "persistent://") { + topicRoute = strings.Replace(topicFn, "persistent://", "", 1) + isPersistent = true + } else if strings.HasPrefix(topicFn, "non-persistent://") { + topicRoute = strings.Replace(topicFn, "non-persistent://", "", 1) + } else { + return false, "", "", "", fmt.Errorf("invalid persistent or non-persistent part") + } + + parts := strings.Split(topicRoute, "/") + if len(parts) == 3 { + return isPersistent, parts[0], parts[1], parts[2], nil + } else if len(parts) == 2 { + return isPersistent, parts[0], parts[1], "", nil + } else { + return false, "", "", "", fmt.Errorf("missing tenant, namespace, or topic name") + } + +}