From 376879140357d997b7d1aff6cfbb7c762b46a003 Mon Sep 17 00:00:00 2001 From: JoranVanBelle Date: Wed, 8 Jan 2025 10:15:11 +0100 Subject: [PATCH] feat(admin): remove topics --- src/Kafka/Admin.hs | 57 ++++++++++++++++++++++++++----- src/Kafka/Internal/RdKafka.chs | 56 +++++++++++++++--------------- tests-it/Kafka/IntegrationSpec.hs | 33 ++++++++++++++++++ 3 files changed, 109 insertions(+), 37 deletions(-) diff --git a/src/Kafka/Admin.hs b/src/Kafka/Admin.hs index 2f2baa0..50f1ab6 100644 --- a/src/Kafka/Admin.hs +++ b/src/Kafka/Admin.hs @@ -2,20 +2,16 @@ module Kafka.Admin( module X , newKAdmin , createTopic +, deleteTopic , closeKAdmin ) where import Control.Monad -import Control.Monad.Trans.Class -import Control.Monad.Trans.Maybe import Control.Monad.IO.Class import Data.Text -import Data.Maybe -import Data.Bifunctor import Data.List.NonEmpty import qualified Data.List.NonEmpty as NEL import qualified Data.Text as T -import qualified Data.Set as S import Kafka.Internal.RdKafka import Kafka.Internal.Setup @@ -53,11 +49,27 @@ createTopic kAdmin topic = liftIO $ do Right _ -> do pure $ Right $ topicName topic +--- DELETE TOPIC --- +deleteTopic :: KAdmin + -> TopicName + -> IO (Either KafkaError TopicName) +deleteTopic kAdmin topic = liftIO $ do + let kafkaPtr = getRdKafka kAdmin + queue <- newRdKafkaQueue kafkaPtr + opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny + + topicRes <- withOldTopic topic $ \topic' -> rdKafkaDeleteTopics kafkaPtr [topic'] opts queue + case topicRes of + Left err -> do + pure $ Left (NEL.head err) + Right _ -> do + pure $ Right topic + withNewTopic :: NewTopic -> (RdKafkaNewTopicTPtr -> IO a) -> IO (Either (NonEmpty KafkaError) a) withNewTopic t transform = do - mkNewTopicRes <- mkNewTopic t topicPtr + mkNewTopicRes <- mkNewTopic t newTopicPtr case mkNewTopicRes of Left err -> do return $ Left err @@ -65,19 +77,46 @@ withNewTopic t transform = do res <- transform topic return $ Right res -topicPtr :: NewTopic -> IO (Either KafkaError RdKafkaNewTopicTPtr) -topicPtr topic = do +withOldTopic :: TopicName + -> (RdKafkaDeleteTopicTPtr -> IO a) + -> IO (Either (NonEmpty KafkaError) a) +withOldTopic tName transform = do + rmOldTopicRes <- rmOldTopic tName oldTopicPtr + case rmOldTopicRes of + Left err -> do + return $ Left err + Right topic -> do + res <- transform topic + return $ Right res + +newTopicPtr :: NewTopic -> IO (Either KafkaError RdKafkaNewTopicTPtr) +newTopicPtr topic = do ptrRes <- newRdKafkaNewTopic (unpack $ unTopicName $ topicName topic) (unPartitionCount $ topicPartitionCount topic) (unReplicationFactor $ topicReplicationFactor topic) case ptrRes of Left str -> pure $ Left (KafkaError $ T.pack str) Right ptr -> pure $ Right ptr +oldTopicPtr :: TopicName -> IO (Either KafkaError RdKafkaDeleteTopicTPtr) +oldTopicPtr tName = do + res <- newRdKafkaDeleteTopic $ unpack . unTopicName $ tName + case res of + Left str -> pure $ Left (KafkaError $ T.pack str) + Right ptr -> pure $ Right ptr + mkNewTopic :: NewTopic -> (NewTopic -> IO (Either KafkaError a)) -> IO (Either (NonEmpty KafkaError) a) mkNewTopic topic create = do res <- create topic case res of - Left err -> pure $ Left (Data.List.NonEmpty.singleton err) + Left err -> pure $ Left (NEL.singleton err) Right resource -> pure $ Right resource +rmOldTopic :: TopicName + -> (TopicName -> IO (Either KafkaError a)) + -> IO (Either (NonEmpty KafkaError) a) +rmOldTopic tName remove = do + res <- remove tName + case res of + Left err -> pure $ Left (NEL.singleton err) + Right resource -> pure $ Right resource diff --git a/src/Kafka/Internal/RdKafka.chs b/src/Kafka/Internal/RdKafka.chs index 3c2bb11..9ec9f52 100644 --- a/src/Kafka/Internal/RdKafka.chs +++ b/src/Kafka/Internal/RdKafka.chs @@ -17,7 +17,7 @@ import Foreign.Storable (Storable(..)) import Foreign.Ptr (Ptr, FunPtr, castPtr, nullPtr) import Foreign.ForeignPtr (FinalizerPtr, addForeignPtrFinalizer, newForeignPtr_, withForeignPtr, ForeignPtr, newForeignPtr) import Foreign.C.Error (Errno(..), getErrno) -import Foreign.C.String (CString, newCString, withCAString, peekCAString, peekCString) +import Foreign.C.String (CString, newCString, withCString, withCAString, peekCAString, peekCString) import Foreign.C.Types (CFile, CInt(..), CSize, CChar, CLong) import System.IO (Handle, stdin, stdout, stderr) import System.Posix.IO (handleToFd) @@ -1203,6 +1203,7 @@ newRdKafkaNewTopic topicName topicPartitions topicReplicationFactor = do then peekCString ptr >>= pure . Left else addForeignPtrFinalizer rdKafkaNewTopicDestroyFinalizer res >> pure (Right res) +--- Create topic rdKafkaCreateTopic :: RdKafkaTPtr -> RdKafkaNewTopicTPtr -> RdKafkaAdminOptionsTPtr @@ -1214,34 +1215,33 @@ rdKafkaCreateTopic kafkaPtr topic opts queue = do withForeignPtrsArrayLen topics $ \tLen tPtr -> do {#call rd_kafka_CreateTopics#} kPtr tPtr (fromIntegral tLen) oPtr qPtr -rdKafkaEventCreateTopicsResult :: RdKafkaEventTPtr -> IO (Maybe RdKafkaTopicResultTPtr) -rdKafkaEventCreateTopicsResult evtPtr = - withForeignPtr evtPtr $ \evtPtr' -> do - res <- {#call rd_kafka_event_CreateTopics_result#} (castPtr evtPtr') +--- Delete topic +foreign import ccall unsafe "rdkafka.h &rd_kafka_DeleteTopic_destroy" + rdKafkaDeleteTopicDestroy :: FinalizerPtr RdKafkaDeleteTopicT + +data RdKafkaDeleteTopicT +{#pointer *rd_kafka_DeleteTopic_t as RdKafkaDeleteTopicTPtr foreign -> RdKafkaDeleteTopicT #} + +data RdKafkaDeleteTopicsResultT +{#pointer *rd_kafka_DeleteTopics_result_t as RdKafkaDeleteTopicResultTPtr foreign -> RdKafkaDeleteTopicsResultT #} + +newRdKafkaDeleteTopic :: String -> IO (Either String RdKafkaDeleteTopicTPtr) +newRdKafkaDeleteTopic topicNameStr = + withCString topicNameStr $ \topicNameStrPtr -> do + res <- {#call rd_kafka_DeleteTopic_new#} topicNameStrPtr if (res == nullPtr) - then pure Nothing - else Just <$> newForeignPtr_ (castPtr res) - -rdKafkaCreateTopicsResultTopics :: RdKafkaTopicResultTPtr - -> IO [Either (String, RdKafkaRespErrT, String) String] -rdKafkaCreateTopicsResultTopics tRes = - withForeignPtr tRes $ \tRes' -> - alloca $ \sPtr -> do - res <- {#call rd_kafka_CreateTopics_result_topics#} (castPtr tRes') sPtr - size <- peekIntConv sPtr - array <- peekArray size res - traverse unpackRdKafkaTopicResult array - -unpackRdKafkaTopicResult :: Ptr RdKafkaTopicResultT - -> IO (Either (String, RdKafkaRespErrT, String) String) -unpackRdKafkaTopicResult resPtr = do - name <- {#call rd_kafka_topic_result_name#} resPtr >>= peekCString - err <- {#call rd_kafka_topic_result_error#} resPtr - case cIntToEnum err of - respErr -> do - errMsg <- {#call rd_kafka_topic_result_error_string#} resPtr >>= peekCString - pure $ Left (name, respErr, errMsg) - RdKafkaRespErrNoError -> pure $ Right name + then return $ Left $ "Something went wrong while deleting topic " ++ topicNameStr + else Right <$> newForeignPtr rdKafkaDeleteTopicDestroy res + +rdKafkaDeleteTopics :: RdKafkaTPtr + -> [RdKafkaDeleteTopicTPtr] + -> RdKafkaAdminOptionsTPtr + -> RdKafkaQueueTPtr + -> IO () +rdKafkaDeleteTopics kafkaPtr topics opts queue = do + withForeignPtrs kafkaPtr opts queue $ \kPtr oPtr qPtr -> + withForeignPtrsArrayLen topics $ \tLen tPtr -> do + {#call rd_kafka_DeleteTopics#} kPtr tPtr (fromIntegral tLen) oPtr qPtr -- Marshall / Unmarshall enumToCInt :: Enum a => a -> CInt diff --git a/tests-it/Kafka/IntegrationSpec.hs b/tests-it/Kafka/IntegrationSpec.hs index 1c21d64..0904009 100644 --- a/tests-it/Kafka/IntegrationSpec.hs +++ b/tests-it/Kafka/IntegrationSpec.hs @@ -177,6 +177,8 @@ spec = do describe "Kafka.Admin.Spec" $ do let topicName = addRandomChars "admin.topic.created." 5 + topicsMVar <- runIO newEmptyMVar + specWithAdmin "Create topic" $ do it "should create a new topic" $ \(admin :: KAdmin) -> do @@ -184,6 +186,37 @@ spec = do let newTopic = mkNewTopic (TopicName ( T.pack(tName) )) result <- createTopic admin newTopic result `shouldSatisfy` isRight + + specWithConsumer "Read all topics" consumerProps $ do + + it "should return all the topics" $ \(consumer :: KafkaConsumer) -> do + res <- allTopicsMetadata consumer (Timeout 1000) + res `shouldSatisfy` isRight + let filterUserTopics m = m { kmTopics = filter (\t -> topicType (tmTopicName t) == User) (kmTopics m) } + let res' = fmap filterUserTopics res + length . kmBrokers <$> res' `shouldBe` Right 1 + + putStrLn $ either + (const "Nothing to show") + (unlines . map (T.unpack . unTopicName . tmTopicName) . kmTopics) + res' + + let topics = either (const []) (map tmTopicName . kmTopics) res' + putMVar topicsMVar topics + + let topicsLen = either (const 0) (length . kmTopics) res' + let hasTopic = either (const False) (any (\t -> tmTopicName t == testTopic) . kmTopics) res' + + topicsLen `shouldSatisfy` (>0) + hasTopic `shouldBe` True + + specWithAdmin "Remove topics" $ do + + it "should delete all the topics currently existing" $ \(admin ::KAdmin) -> do + topics <- takeMVar topicsMVar + forM_ topics $ \topic -> do + result <- deleteTopic admin topic + result `shouldSatisfy` isRight ---------------------------------------------------------------------------------------------------------------- data ReadState = Skip | Read