Skip to content
This repository has been archived by the owner on Mar 24, 2022. It is now read-only.

Commit

Permalink
Merge pull request #2 from fasten-project/rest
Browse files Browse the repository at this point in the history
REST support
  • Loading branch information
wzorgdrager authored Feb 5, 2021
2 parents 7184a83 + 885f945 commit 123765b
Show file tree
Hide file tree
Showing 13 changed files with 416 additions and 75 deletions.
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ This application crawls from [Maven Central Incremental Index Repository](https:
Running this application will follow this repository and outputs the __unique artifacts__ released on Maven central.
Currently, Maven Central releases a new (incremental) index __every week__.

Several outputs exist including Kafka. REST API support will be added soon. Moreover, a checkpointing mechanism is added to support persistence across restarts.
Several outputs exist including Kafka and HTTP support. Moreover, a checkpointing mechanism is added to support persistence across restarts.
More specifically, the `checkpointDir` stores an `INDEX.index` file where the `INDEX` is the _next_ index to crawl. E.g. when `800.index` is stored, the crawler will start crawling _including_ index 800.

## Usage
Expand All @@ -20,8 +20,13 @@ usage: IncrementalMavenCrawler
crawled index. Used for recovery on
crash or restart. Optional.
-kb,--kafka_brokers <brokers> Kafka brokers to connect with. I.e.
broker1:port,broker2:port,... Optional.
broker1:port,broker2:port,...
Required for Kafka output.
-kt,--kafka_topic <topic> Kafka topic to produce to.
Required for Kafka output.
-re,--rest_endpoint <url> HTTP endpoint to post crawled batches to.
Required for Rest output.
```

### Outputs
Expand Down
6 changes: 5 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@
<version>1.10.19</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
</dependencies>

<build>
Expand Down
56 changes: 38 additions & 18 deletions src/main/java/eu/fasten/crawler/CrawlIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,33 +99,28 @@ public List<IndexCreator> getIndexCreators()
* @param output the class to output to (E.g. Kafka).
* @param batchSize the batch size to send to the output.
*/
public void crawlAndSend(Output output, int batchSize) {
public boolean crawlAndSend(Output output, int batchSize) {
nonUnique = 0;
Set<MavenArtifact> artifactSet = new HashSet<>();
HashSet<MavenArtifact> artifactSet = new HashSet<>();

// Setup output.
output.open();

IndexDataReader.IndexDataReadVisitor visitor = (doc) -> {
MavenArtifact artifact = MavenArtifact.fromDocument(doc, context);
if (artifact == null) {
logger.warn("Couldn't construct artifact info for document: " + doc.toString() + ". We will skip it.");
return;
}

if (artifactSet.contains(artifact)) {
nonUnique += 1;
} else {
artifactSet.add(artifact);
}
};
IndexDataReader.IndexDataReadVisitor visitor = setupUniqueVisitor(artifactSet);

try {
IndexDataReader.IndexDataReadResult result = reader.readIndex(visitor, context);

// Send to output.
final List<List<MavenArtifact>> batchedLists = Lists.partition(Lists.newArrayList(artifactSet), batchSize);
batchedLists.forEach((l) -> output.send(l));
for (List<MavenArtifact> artifacts : batchedLists) {
boolean res = output.send(artifacts);

if (!res) {
logger.error("Failed sending batch to ouput for index " + index + ". Exiting current crawl session.");
return false;
}
}

// Flush and close output.
output.flush();
Expand All @@ -137,11 +132,36 @@ public void crawlAndSend(Output output, int batchSize) {
logger.info("Unique documents: " + artifactSet.size());
logger.info("Total documents: " + result.getDocumentCount());
} catch (IOException e) {
logger.error("IOException while reading from the index", e);
throw new RuntimeException("Now exiting due to IOExcepton.");
logger.error("IOException while reading from the index. " + index + ". Exiting current crawl session.", e);
return false;
} finally {
nonUnique = 0;
}

return true;
}

/**
* Setup DataReadVisitor which fills a set with unique artifacts.
* @param artifactSet reference to the set which will be filled with unique artifacts.
* @return the visitor.
*/
public IndexDataReader.IndexDataReadVisitor setupUniqueVisitor(HashSet<MavenArtifact> artifactSet) {
IndexDataReader.IndexDataReadVisitor visitor = (doc) -> {
MavenArtifact artifact = MavenArtifact.fromDocument(doc, context);
if (artifact == null) {
logger.warn("Couldn't construct artifact info for document: " + doc.toString() + ". We will skip it.");
return;
}

if (artifactSet.contains(artifact)) {
nonUnique += 1;
} else {
artifactSet.add(artifact);
}
};

return visitor;
}

}
58 changes: 36 additions & 22 deletions src/main/java/eu/fasten/crawler/IncrementalMavenCrawler.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package eu.fasten.crawler;

import eu.fasten.crawler.output.KafkaOutput;
import eu.fasten.crawler.output.Output;
import eu.fasten.crawler.output.StdOutput;
import eu.fasten.crawler.output.*;
import org.apache.commons.cli.*;
import org.codehaus.plexus.util.FileUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -76,15 +74,15 @@ public class IncrementalMavenCrawler implements Runnable {
.desc("Kafka brokers to connect with. I.e. broker1:port,broker2:port,... Optional.")
.build();

public static void main(String[] args) {
options.addOption(optStartIndex);
options.addOption(optBatchSize);
options.addOption(optOutputType);
options.addOption(optCrawlInterval);
options.addOption(optCheckpointDir);
options.addOption(optKafkaTopic);
options.addOption(optKafkaBrokers);
static Option optRestEndpoint = Option.builder("re")
.longOpt("rest_endpoint")
.hasArg()
.argName("url")
.desc("HTTP endpoint to post crawled batches to.")
.build();

public static void main(String[] args) {
addOptions();
CommandLineParser parser = new DefaultParser();

Properties properties;
Expand All @@ -96,23 +94,29 @@ public static void main(String[] args) {
}

// Setup arguments for crawler.
Output output = new StdOutput();
int batchSize = Integer.parseInt(properties.getProperty("batch_size"));
int startIndex = Integer.parseInt(properties.getProperty("index"));
int interval = Integer.parseInt(properties.getProperty("interval"));
String checkpointDir = properties.getProperty("checkpoint_dir");

// Setup Kafka.
if (properties.get("output").equals("kafka")) {
output = new KafkaOutput(properties.getProperty("kafka_topic"), properties.getProperty("kafka_brokers"), batchSize);
}
Output output = OutputFactory.getOutput(properties.getProperty("output"), properties);

// Start cralwer and execute it with an interval.
IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(startIndex, batchSize, output, checkpointDir);
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
service.scheduleAtFixedRate(crawler, 0, interval, TimeUnit.HOURS);
}

public static void addOptions() {
options.addOption(optStartIndex);
options.addOption(optBatchSize);
options.addOption(optOutputType);
options.addOption(optCrawlInterval);
options.addOption(optCheckpointDir);
options.addOption(optKafkaTopic);
options.addOption(optKafkaBrokers);
options.addOption(optRestEndpoint);
}

/**
* Verify and stores arguments in properties instance.
* @param cmd the parsed command line arguments.
Expand All @@ -126,13 +130,18 @@ public static Properties verifyAndParseArguments(CommandLine cmd) throws ParseEx
throw new ParseException("Configured output to be Kafka, but no `kafka_topic` or `kafka_brokers` have been configured.");
}

if (cmd.getOptionValue("output").equals("rest") && !(cmd.hasOption("rest_endpoint"))) {
throw new ParseException("Configured output to be Rest, but no `rest_endpoint` has been configured.");
}

props.setProperty("index", cmd.getOptionValue("start_index", "0"));
props.setProperty("batch_size", cmd.getOptionValue("batch_size", "50"));
props.setProperty("output", cmd.getOptionValue("output", "std"));
props.setProperty("interval", cmd.getOptionValue("interval", "1"));
props.setProperty("checkpoint_dir", cmd.getOptionValue("checkpoint_dir", ""));
props.setProperty("kafka_topic", cmd.getOptionValue("kafka_topic", ""));
props.setProperty("kafka_brokers", cmd.getOptionValue("kafka_brokers", ""));
props.setProperty("rest_endpoint", cmd.getOptionValue("rest_endpoint", ""));

return props;
}
Expand Down Expand Up @@ -163,6 +172,8 @@ public IncrementalMavenCrawler(int startIndex, int batchSize, Output output, Str
if (this.index > startIndex) {
logger.info("Found (checkpointed) index in " + checkpointDir + ". Will start crawling from index " + this.index);
}

logger.info("Starting IncrementalMavenCrawler with index: " + this.index + ", batch size: " + batchSize + " and output " + output.getClass().getSimpleName() + ".");
}

/**
Expand Down Expand Up @@ -211,15 +222,18 @@ public void run() {

// Setup crawler.
CrawlIndex crawlIndex = new CrawlIndex(index, indexFile);
crawlIndex.crawlAndSend(output, batchSize);
boolean success = crawlIndex.crawlAndSend(output, batchSize);

// Delete the index file.
indexFile.delete();

logger.info("Index " + index + " successfully crawled.");

// Update (and increment) the index.
updateIndex();
if (success) {
logger.info("Index " + index + " successfully crawled.");
// Update (and increment) the index.
updateIndex();
} else {
logger.warn("Failed crawling index " + index + ". Will retry on next interval.");
}
}

/**
Expand Down
11 changes: 8 additions & 3 deletions src/main/java/eu/fasten/crawler/output/KafkaOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,25 @@ public void flush() {
* @param artifact the artifacts (we expect it to be of size batch size).
*/
@Override
public void send(List<MavenArtifact> artifact) {
public boolean send(List<MavenArtifact> artifact) {
List<ProducerRecord<String, String>> records = artifact
.stream()
.map((x) -> new ProducerRecord<String, String>(topic, null, x.getTimestamp(), null, x.toString()))
.collect(Collectors.toList());

records.stream().map((r) -> producer.send(r)).parallel().forEach((f) -> {
boolean result = records.stream().map((r) -> producer.send(r)).parallel().map((f) -> {
try {
f.get();
return true;
} catch (InterruptedException e) {
e.printStackTrace();
return false;
} catch (ExecutionException e) {
e.printStackTrace();
return false;
}
});
}).noneMatch((x) -> x == false);

return result;
}
}
13 changes: 7 additions & 6 deletions src/main/java/eu/fasten/crawler/output/Output.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
public interface Output {

/** Helper methods for constructing and cleaning up the output instance. **/
void open();
void close();
void flush();
default void open() {}
default void close() {}
default void flush() {}

/** Send records to output. **/
default void send(MavenArtifact artifact) {
send(Arrays.asList(artifact));
default boolean send(MavenArtifact artifact) {
return send(Arrays.asList(artifact));
}
void send(List<MavenArtifact> artifact);

boolean send(List<MavenArtifact> artifact);
}
17 changes: 17 additions & 0 deletions src/main/java/eu/fasten/crawler/output/OutputFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package eu.fasten.crawler.output;

import java.util.Properties;

public class OutputFactory {

public static Output getOutput(String outputName, Properties properties) {
switch (outputName) {
case "kafka":
return new KafkaOutput(properties.getProperty("kafka_topic"), properties.getProperty("kafka_brokers"), Integer.parseInt(properties.getProperty("batch_size")));
case "rest":
return new RestOutput(properties.getProperty("rest_endpoint"));
default:
return new StdOutput();
}
}
}
Loading

0 comments on commit 123765b

Please sign in to comment.