Skip to content

Commit

Permalink
Restart topics streaming before shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelfc committed Sep 9, 2017
1 parent 3157297 commit 7927209
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 99 deletions.
14 changes: 14 additions & 0 deletions marble-core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Changelog
All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [1.1.1] - 2017-06-20
### Fixed
- Fixed issue #47: wrong streaming status of topic.

### Added
- Topics streaming before shutdown will be restarted again after startup.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class ApplicationStartup implements ApplicationListener<ApplicationReadyE
@Override
public void onApplicationEvent(final ApplicationReadyEvent event) {
jobService.cleanOldJobs();
topicService.cleanOldStreamingTopics();
topicService.restartStreamingTopics();
return;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ public interface TopicService {

TopicStats getStats(String name) throws InvalidTopicException;

void cleanOldStreamingTopics();
void restartStreamingTopics();

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package org.marble.commons.service;

import java.math.BigInteger;
import java.util.List;

import org.marble.commons.domain.repository.TopicRepository;
import org.marble.commons.exception.InvalidExecutionException;
import org.marble.commons.exception.InvalidTopicException;
import org.marble.commons.model.JobRestResult;
import org.marble.commons.model.TopicStats;
import org.marble.model.domain.model.Job;
import org.marble.model.domain.model.Post;
Expand All @@ -21,121 +23,132 @@
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;

@Service
public class TopicServiceImpl implements TopicService {

private static final Logger log = LoggerFactory.getLogger(TopicServiceImpl.class);
@Autowired
TopicRepository topicRepository;
private static final Logger log = LoggerFactory.getLogger(TopicServiceImpl.class);
@Autowired
TopicRepository topicRepository;

@Autowired
DatastoreService datastoreService;
@Autowired
DatastoreService datastoreService;

@Autowired
JobService jobService;
@Autowired
JobService jobService;

@Autowired
ChartService plotService;
@Autowired
ChartService plotService;

@Autowired
PostService postService;
@Autowired
PostService postService;

@Override
public Topic save(Topic topic) throws InvalidTopicException {
// TODO Modify this in order to update only certain fields (and do not
// overwrite the post)
topic = topicRepository.save(topic);
if (topic == null) {
throw new InvalidTopicException();
}
return topic;
@Override
public Topic save(Topic topic) throws InvalidTopicException {
// TODO Modify this in order to update only certain fields (and do not
// overwrite the post)
topic = topicRepository.save(topic);
if (topic == null) {
throw new InvalidTopicException();
}

@Override
public Topic findOne(String name) throws InvalidTopicException {
Topic topic = topicRepository.findOne(name);
if (topic == null) {
throw new InvalidTopicException();
}
return topic;
return topic;
}

@Override
public Topic findOne(String name) throws InvalidTopicException {
Topic topic = topicRepository.findOne(name);
if (topic == null) {
throw new InvalidTopicException();
}

@Override
public List<Topic> findAll() {
List<Topic> topics = topicRepository.findAll();
return topics;
return topic;
}

@Override
public List<Topic> findAll() {
List<Topic> topics = topicRepository.findAll();
return topics;
}

@Override
public void delete(String name) {
topicRepository.delete(name);
// Remove all the related posts and jobs from the database
postService.deleteByTopicName(name);
jobService.deleteByTopicName(name);
plotService.deleteByTopicName(name);
// TODO remove processed posts
return;
}

@Override
public TopicStats getStats(String name) throws InvalidTopicException {
// This is only to check if exists
Topic topic = topicRepository.findOne(name);
if (topic == null) {
throw new InvalidTopicException();
}

@Override
public void delete(String name) {
topicRepository.delete(name);
// Remove all the related posts and jobs from the database
postService.deleteByTopicName(name);
jobService.deleteByTopicName(name);
plotService.deleteByTopicName(name);
// TODO remove processed posts
return;
TopicStats topicStats = new TopicStats();
topicStats.setTopicName(name);
try {
topicStats.setTotalPostsExtracted(datastoreService.countByTopicId(name, Post.class));
topicStats.setTotalPostsProcessed(datastoreService.countByTopicId(name, ProcessedPost.class));

if (topicStats.getTotalPostsExtracted() > 0) {
Post post = datastoreService.findOneByTopicIdSortBy(name, "createdAt", Sort.Direction.ASC,
Post.class);
topicStats.setOldestPostDate(post.getCreatedAt());
topicStats.setOldestPostId(post.getOriginalId());

post = datastoreService.findOneByTopicIdSortBy(name, "createdAt", Sort.Direction.DESC,
Post.class);
topicStats.setNewestPostDate(post.getCreatedAt());
topicStats.setNewestPostId(post.getOriginalId());

topicStats.setTotalJobs(jobService.countByTopicName(name));
}
} catch (MongoException e) {
log.warn("Exception caught while extracting the topic info.", e);
}

@Override
public TopicStats getStats(String name) throws InvalidTopicException {
// This is only to check if exists
Topic topic = topicRepository.findOne(name);
if (topic == null) {
throw new InvalidTopicException();
return topicStats;
}

@Override
public Long count() {
return topicRepository.count();
}

@Override
public void restartStreamingTopics() {
Pageable page = new PageRequest(0, 100);
Page<Topic> results;
do {
results = topicRepository.findByStreaming(true, page);
for (Topic topic : results.getContent()) {
String msg = "Restarting streaming of topic <" + topic.getName() + ">.";
log.info(msg);
BigInteger executionId;
try {
executionId = jobService.executeStreamer(topic.getName());
msg = "Streaming started with id <" + executionId + ">.";
log.info(msg);
} catch (InvalidTopicException | InvalidExecutionException e) {
log.error("An error occurred while restarting topic <" + topic.getName() + ">", e);
}

TopicStats topicStats = new TopicStats();
topicStats.setTopicName(name);

try {
topicStats.setTotalPostsExtracted(datastoreService.countByTopicId(name, Post.class));
topicStats.setTotalPostsProcessed(datastoreService.countByTopicId(name, ProcessedPost.class));

if (topicStats.getTotalPostsExtracted() > 0) {
Post post = datastoreService.findOneByTopicIdSortBy(name, "createdAt", Sort.Direction.ASC, Post.class);
topicStats.setOldestPostDate(post.getCreatedAt());
topicStats.setOldestPostId(post.getOriginalId());

post = datastoreService.findOneByTopicIdSortBy(name, "createdAt", Sort.Direction.DESC, Post.class);
topicStats.setNewestPostDate(post.getCreatedAt());
topicStats.setNewestPostId(post.getOriginalId());

topicStats.setTotalJobs(jobService.countByTopicName(name));
}
} catch (MongoException e) {
log.warn("Exception caught while extracting the topic info.", e);
Thread.sleep(10000L);
} catch (InterruptedException e) {
log.error("A weird error was found while trying to sleep.", e);
}
return topicStats;
}
}
page = page.next();
} while (results.hasNext());

@Override
public Long count() {
return topicRepository.count();
}

@Override
public void cleanOldStreamingTopics() {
Pageable page = new PageRequest(0, 100);
Page<Topic> results;
do {
results = topicRepository.findByStreaming(true, page);
for (Topic topic : results.getContent()) {
String msg = "Marking topic <" + topic.getName() + "> as not streaming.";
log.info(msg);
topic.setStreaming(false);
try {
topic = this.save(topic);
} catch (InvalidTopicException e) {
log.error("An error occurred while persisting the topic.");
}

}
page = page.next();
} while (results.hasNext());

return;
}
return;
}

}

0 comments on commit 7927209

Please sign in to comment.