From 3157297f21bb75928b93ed6fe3d7a750405503b0 Mon Sep 17 00:00:00 2001 From: Miguel Fernandes Date: Sat, 9 Sep 2017 10:44:43 +0200 Subject: [PATCH 1/2] Fixes #47 --- .../commons/executor/streamer/TwitterStreamerExecutor.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/marble-core/src/main/java/org/marble/commons/executor/streamer/TwitterStreamerExecutor.java b/marble-core/src/main/java/org/marble/commons/executor/streamer/TwitterStreamerExecutor.java index fa28b31..ff5634f 100644 --- a/marble-core/src/main/java/org/marble/commons/executor/streamer/TwitterStreamerExecutor.java +++ b/marble-core/src/main/java/org/marble/commons/executor/streamer/TwitterStreamerExecutor.java @@ -257,9 +257,6 @@ public void stopStreaming(Job job) { query = query.locations(locations); } - topic.setStreaming(Boolean.TRUE); - topicService.save(topic); - twitterStream.filter(query); msg = "Stop operation finished."; log.info(msg); From 792720933cb8efa6c7e715e696d187881ad46dd9 Mon Sep 17 00:00:00 2001 From: Miguel Fernandes Date: Sat, 9 Sep 2017 10:45:01 +0200 Subject: [PATCH 2/2] Restart topics streaming before shutdown --- marble-core/CHANGELOG.md | 14 ++ .../commons/config/ApplicationStartup.java | 2 +- .../marble/commons/service/TopicService.java | 2 +- .../commons/service/TopicServiceImpl.java | 207 ++++++++++-------- 4 files changed, 126 insertions(+), 99 deletions(-) create mode 100644 marble-core/CHANGELOG.md diff --git a/marble-core/CHANGELOG.md b/marble-core/CHANGELOG.md new file mode 100644 index 0000000..c021ec0 --- /dev/null +++ b/marble-core/CHANGELOG.md @@ -0,0 +1,14 @@ +# Changelog +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) +and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). + +## [Unreleased] + +## [1.1.1] - 2017-06-20 +### Fixed +- Fixed issue #47: wrong streaming status of topic. + +### Added +- Topics streaming before shutdown will be restarted again after startup. \ No newline at end of file diff --git a/marble-core/src/main/java/org/marble/commons/config/ApplicationStartup.java b/marble-core/src/main/java/org/marble/commons/config/ApplicationStartup.java index c80300f..504583c 100644 --- a/marble-core/src/main/java/org/marble/commons/config/ApplicationStartup.java +++ b/marble-core/src/main/java/org/marble/commons/config/ApplicationStartup.java @@ -19,7 +19,7 @@ public class ApplicationStartup implements ApplicationListener findAll() { - List topics = topicRepository.findAll(); - return topics; + return topic; + } + + @Override + public List findAll() { + List topics = topicRepository.findAll(); + return topics; + } + + @Override + public void delete(String name) { + topicRepository.delete(name); + // Remove all the related posts and jobs from the database + postService.deleteByTopicName(name); + jobService.deleteByTopicName(name); + plotService.deleteByTopicName(name); + // TODO remove processed posts + return; + } + + @Override + public TopicStats getStats(String name) throws InvalidTopicException { + // This is only to check if exists + Topic topic = topicRepository.findOne(name); + if (topic == null) { + throw new InvalidTopicException(); } - @Override - public void delete(String name) { - topicRepository.delete(name); - // Remove all the related posts and jobs from the database - postService.deleteByTopicName(name); - jobService.deleteByTopicName(name); - plotService.deleteByTopicName(name); - // TODO remove processed posts - return; + TopicStats topicStats = new TopicStats(); + topicStats.setTopicName(name); + try { + topicStats.setTotalPostsExtracted(datastoreService.countByTopicId(name, Post.class)); + topicStats.setTotalPostsProcessed(datastoreService.countByTopicId(name, ProcessedPost.class)); + + if (topicStats.getTotalPostsExtracted() > 0) { + Post post = datastoreService.findOneByTopicIdSortBy(name, "createdAt", Sort.Direction.ASC, + Post.class); + topicStats.setOldestPostDate(post.getCreatedAt()); + topicStats.setOldestPostId(post.getOriginalId()); + + post = datastoreService.findOneByTopicIdSortBy(name, "createdAt", Sort.Direction.DESC, + Post.class); + topicStats.setNewestPostDate(post.getCreatedAt()); + topicStats.setNewestPostId(post.getOriginalId()); + + topicStats.setTotalJobs(jobService.countByTopicName(name)); + } + } catch (MongoException e) { + log.warn("Exception caught while extracting the topic info.", e); } - - @Override - public TopicStats getStats(String name) throws InvalidTopicException { - // This is only to check if exists - Topic topic = topicRepository.findOne(name); - if (topic == null) { - throw new InvalidTopicException(); + return topicStats; + } + + @Override + public Long count() { + return topicRepository.count(); + } + + @Override + public void restartStreamingTopics() { + Pageable page = new PageRequest(0, 100); + Page results; + do { + results = topicRepository.findByStreaming(true, page); + for (Topic topic : results.getContent()) { + String msg = "Restarting streaming of topic <" + topic.getName() + ">."; + log.info(msg); + BigInteger executionId; + try { + executionId = jobService.executeStreamer(topic.getName()); + msg = "Streaming started with id <" + executionId + ">."; + log.info(msg); + } catch (InvalidTopicException | InvalidExecutionException e) { + log.error("An error occurred while restarting topic <" + topic.getName() + ">", e); } - - TopicStats topicStats = new TopicStats(); - topicStats.setTopicName(name); + try { - topicStats.setTotalPostsExtracted(datastoreService.countByTopicId(name, Post.class)); - topicStats.setTotalPostsProcessed(datastoreService.countByTopicId(name, ProcessedPost.class)); - - if (topicStats.getTotalPostsExtracted() > 0) { - Post post = datastoreService.findOneByTopicIdSortBy(name, "createdAt", Sort.Direction.ASC, Post.class); - topicStats.setOldestPostDate(post.getCreatedAt()); - topicStats.setOldestPostId(post.getOriginalId()); - - post = datastoreService.findOneByTopicIdSortBy(name, "createdAt", Sort.Direction.DESC, Post.class); - topicStats.setNewestPostDate(post.getCreatedAt()); - topicStats.setNewestPostId(post.getOriginalId()); - - topicStats.setTotalJobs(jobService.countByTopicName(name)); - } - } catch (MongoException e) { - log.warn("Exception caught while extracting the topic info.", e); + Thread.sleep(10000L); + } catch (InterruptedException e) { + log.error("A weird error was found while trying to sleep.", e); } - return topicStats; - } + } + page = page.next(); + } while (results.hasNext()); - @Override - public Long count() { - return topicRepository.count(); - } - - @Override - public void cleanOldStreamingTopics() { - Pageable page = new PageRequest(0, 100); - Page results; - do { - results = topicRepository.findByStreaming(true, page); - for (Topic topic : results.getContent()) { - String msg = "Marking topic <" + topic.getName() + "> as not streaming."; - log.info(msg); - topic.setStreaming(false); - try { - topic = this.save(topic); - } catch (InvalidTopicException e) { - log.error("An error occurred while persisting the topic."); - } - - } - page = page.next(); - } while (results.hasNext()); - - return; - } + return; + } }