diff --git a/go.mod b/go.mod index 1b70d8b..8603be6 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.14 require ( github.com/caarlos0/env/v6 v6.2.1 - github.com/confluentinc/confluent-kafka-go v1.4.2 github.com/gin-gonic/gin v1.6.2 go.mongodb.org/mongo-driver v1.3.2 go.uber.org/zap v1.15.0 diff --git a/go.sum b/go.sum index 88489a1..702a3fa 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/caarlos0/env/v6 v6.2.1 h1:/bFpX1dg4TNioJjg7mrQaSrBoQvRfLUHNfXivdFbbEo= github.com/caarlos0/env/v6 v6.2.1/go.mod h1:3LpmfcAYCG6gCiSgDLaFR5Km1FRpPwFvBbRcjHar6Sw= -github.com/confluentinc/confluent-kafka-go v1.4.2 h1:13EK9RTujF7lVkvHQ5Hbu6bM+Yfrq8L0MkJNnjHSd4Q= -github.com/confluentinc/confluent-kafka-go v1.4.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= @@ -107,8 +105,6 @@ go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= -go.uber.org/zap v1.14.1 h1:nYDKopTbvAPq/NrUVZwT15y2lpROBiLLyoRTbXOYWOo= -go.uber.org/zap v1.14.1/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= go.uber.org/zap v1.15.0 h1:ZZCA22JRF2gQE5FoNmhmrf7jeJJ2uhqDUNRYKm8dvmM= go.uber.org/zap v1.15.0/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= diff --git a/main.go b/main.go index face9ec..edf2ae9 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,6 @@ import ( "github.com/siesgstarena/epicentre/logger" routes "github.com/siesgstarena/epicentre/router" "github.com/siesgstarena/epicentre/services/mongo" - "github.com/siesgstarena/epicentre/services/kafka" ) func main() { @@ -25,27 +24,11 @@ func main() { mongo.LoadMongo() - err = kafka.LoadKafka() - if err != nil { - panic(err) - } - logger.Log.Info("Kafka Installed Successfully") - router := gin.Default() routes.LoadRouter(router) router.Run(":" + config.Config.Port) - err = kafka.ProduceMessage("Testing Kafka Implementation") - if err != nil { - panic(err) - } - - err = kafka.ConsumeMessage() - if err != nil { - panic(err) - } - // defer mongo.Client.Disconnect(*mongo.Ctx) } diff --git a/services/kafka/consumer.go b/services/kafka/consumer.go deleted file mode 100644 index e0c3e36..0000000 --- a/services/kafka/consumer.go +++ /dev/null @@ -1,54 +0,0 @@ -package kafka - -import ( - "fmt" - "os" - "os/signal" - "syscall" - kafkaConfiguration "github.com/siesgstarena/epicentre/config" - "github.com/confluentinc/confluent-kafka-go/kafka" -) - -// ConsumeMessage This function can be used to receive message on the topic -func ConsumeMessage() error { - topic := fmt.Sprintf("%sdefault", kafkaConfiguration.Config.KafkaTopicPrefix) - sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - err := Consumer.Subscribe(topic, nil) - if err != nil { - return err - } - run := true - counter := 0 - commitAfter := 1000 - for run == true { - select { - case sig := <-sigchan: - fmt.Printf("Caught signal %v: terminating\n", sig) - run = false - case ev := <-Consumer.Events(): - switch e := ev.(type) { - case kafka.AssignedPartitions: - Consumer.Assign(e.Partitions) - case kafka.RevokedPartitions: - Consumer.Unassign() - case *kafka.Message: - fmt.Printf("%% Message on %s: %s\n", e.TopicPartition, string(e.Value)) - counter++ - if counter > commitAfter { - Consumer.Commit() - counter = 0 - } - case kafka.PartitionEOF: - fmt.Printf("%% Reached %v\n", e) - case kafka.Error: - fmt.Fprintf(os.Stderr, "%% Error: %v\n", e) - run = false - return e - } - } - } - fmt.Printf("Closing consumer\n") - Consumer.Close() - return nil -} \ No newline at end of file diff --git a/services/kafka/kafka.go b/services/kafka/kafka.go deleted file mode 100644 index 5d9e4bc..0000000 --- a/services/kafka/kafka.go +++ /dev/null @@ -1,47 +0,0 @@ -package kafka - -import ( - "fmt" - "os" - kafkaConfiguration "github.com/siesgstarena/epicentre/config" - "github.com/confluentinc/confluent-kafka-go/kafka" -) - -// Producer for producing Messages -var Producer *kafka.Producer - -// Consumer for consuming Messages -var Consumer *kafka.Consumer - -// LoadKafka Configures Producer & Consumer with provided configuration -func LoadKafka() error { - config := &kafka.ConfigMap{ - "metadata.broker.list": kafkaConfiguration.Config.KafkaBrokerList, - "security.protocol": "SASL_SSL", - "sasl.mechanisms": "SCRAM-SHA-256", - "sasl.username": kafkaConfiguration.Config.KafkaUsername, - "sasl.password": kafkaConfiguration.Config.KafkaPassword, - "group.id": kafkaConfiguration.Config.KafkaGroupID, - "default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"}, - } - - p, err := kafka.NewProducer(config) - if err != nil { - fmt.Printf("Failed to create producer: %s\n", err) - os.Exit(1) - return err - } - Producer = p - fmt.Printf("Created Producer %v\n", p) - - c, err := kafka.NewConsumer(config) - if err != nil { - fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err) - os.Exit(1) - return err - } - Consumer = c - fmt.Printf("Created Consumer %v\n", c) - - return nil -} \ No newline at end of file diff --git a/services/kafka/producer.go b/services/kafka/producer.go deleted file mode 100644 index 1b0db6a..0000000 --- a/services/kafka/producer.go +++ /dev/null @@ -1,32 +0,0 @@ -package kafka - -import ( - "fmt" - kafkaConfiguration "github.com/siesgstarena/epicentre/config" - "github.com/confluentinc/confluent-kafka-go/kafka" -) - -// ProduceMessage This function can be used to send message on the topic -func ProduceMessage(message string) error { - topic := fmt.Sprintf("%sdefault", kafkaConfiguration.Config.KafkaTopicPrefix) - deliveryChan := make(chan kafka.Event) - fmt.Println(message) - for i := 0; i < 10; i++ { - value := fmt.Sprintf("[%d] %s", i+1, message) - err := Producer.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(value)}, deliveryChan) - if err != nil { - return err - } - e := <-deliveryChan - m := e.(*kafka.Message) - if m.TopicPartition.Error != nil { - fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error) - } else { - fmt.Printf("Delivered message to topic %s [%d] at offset %v\n", - *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset) - } - fmt.Println(value) - } - close(deliveryChan) - return nil -} \ No newline at end of file