From d05fe98c1859fe8d756ebe2355c657144d00a6ef Mon Sep 17 00:00:00 2001 From: Wouter Zorgdrager Date: Fri, 5 Feb 2021 11:58:45 +0100 Subject: [PATCH 01/10] Start working on rest support --- README.md | 2 +- pom.xml | 6 ++- .../java/eu/fasten/crawler/CrawlIndex.java | 17 ++++++-- .../crawler/IncrementalMavenCrawler.java | 13 +++--- .../eu/fasten/crawler/output/KafkaOutput.java | 11 +++-- .../java/eu/fasten/crawler/output/Output.java | 12 +++--- .../eu/fasten/crawler/output/RestOutput.java | 42 +++++++++++++++++++ .../eu/fasten/crawler/output/StdOutput.java | 12 +----- 8 files changed, 85 insertions(+), 30 deletions(-) create mode 100644 src/main/java/eu/fasten/crawler/output/RestOutput.java diff --git a/README.md b/README.md index e70bc20..c004ca3 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ This application crawls from [Maven Central Incremental Index Repository](https: Running this application will follow this repository and outputs the __unique artifacts__ released on Maven central. Currently, Maven Central releases a new (incremental) index __every week__. -Several outputs exist including Kafka. REST API support will be added soon. Moreover, a checkpointing mechanism is added to support persistence across restarts. +Several outputs exist including Kafka and HTTP support. Moreover, a checkpointing mechanism is added to support persistence across restarts. More specifically, the `checkpointDir` stores an `INDEX.index` file where the `INDEX` is the _next_ index to crawl. E.g. when `800.index` is stored, the crawler will start crawling _including_ index 800. ## Usage diff --git a/pom.xml b/pom.xml index 035e81f..48cb5a4 100644 --- a/pom.xml +++ b/pom.xml @@ -119,7 +119,11 @@ 1.10.19 test - + + org.apache.httpcomponents + httpclient + 4.5.13 + diff --git a/src/main/java/eu/fasten/crawler/CrawlIndex.java b/src/main/java/eu/fasten/crawler/CrawlIndex.java index 17afa0e..8b8a1f5 100644 --- a/src/main/java/eu/fasten/crawler/CrawlIndex.java +++ b/src/main/java/eu/fasten/crawler/CrawlIndex.java @@ -99,7 +99,7 @@ public List getIndexCreators() * @param output the class to output to (E.g. Kafka). * @param batchSize the batch size to send to the output. */ - public void crawlAndSend(Output output, int batchSize) { + public boolean crawlAndSend(Output output, int batchSize) { nonUnique = 0; Set artifactSet = new HashSet<>(); @@ -125,7 +125,14 @@ public void crawlAndSend(Output output, int batchSize) { // Send to output. final List> batchedLists = Lists.partition(Lists.newArrayList(artifactSet), batchSize); - batchedLists.forEach((l) -> output.send(l)); + for (List artifacts : batchedLists) { + boolean res = output.send(artifacts); + + if (!res) { + logger.error("Failed sending batch to ouput for index " + index + ". Exiting current crawl session."); + return false; + } + } // Flush and close output. output.flush(); @@ -137,11 +144,13 @@ public void crawlAndSend(Output output, int batchSize) { logger.info("Unique documents: " + artifactSet.size()); logger.info("Total documents: " + result.getDocumentCount()); } catch (IOException e) { - logger.error("IOException while reading from the index", e); - throw new RuntimeException("Now exiting due to IOExcepton."); + logger.error("IOException while reading from the index. " + index + ". Exiting current crawl session.", e); + return false; } finally { nonUnique = 0; } + + return true; } } diff --git a/src/main/java/eu/fasten/crawler/IncrementalMavenCrawler.java b/src/main/java/eu/fasten/crawler/IncrementalMavenCrawler.java index ba487f5..749a0f6 100644 --- a/src/main/java/eu/fasten/crawler/IncrementalMavenCrawler.java +++ b/src/main/java/eu/fasten/crawler/IncrementalMavenCrawler.java @@ -211,15 +211,18 @@ public void run() { // Setup crawler. CrawlIndex crawlIndex = new CrawlIndex(index, indexFile); - crawlIndex.crawlAndSend(output, batchSize); + boolean success = crawlIndex.crawlAndSend(output, batchSize); // Delete the index file. indexFile.delete(); - logger.info("Index " + index + " successfully crawled."); - - // Update (and increment) the index. - updateIndex(); + if (success) { + logger.info("Index " + index + " successfully crawled."); + // Update (and increment) the index. + updateIndex(); + } else { + logger.warn("Failed crawling index " + index + ". Will retry on next interval."); + } } /** diff --git a/src/main/java/eu/fasten/crawler/output/KafkaOutput.java b/src/main/java/eu/fasten/crawler/output/KafkaOutput.java index 93a641d..d6599e3 100644 --- a/src/main/java/eu/fasten/crawler/output/KafkaOutput.java +++ b/src/main/java/eu/fasten/crawler/output/KafkaOutput.java @@ -53,20 +53,25 @@ public void flush() { * @param artifact the artifacts (we expect it to be of size batch size). */ @Override - public void send(List artifact) { + public boolean send(List artifact) { List> records = artifact .stream() .map((x) -> new ProducerRecord(topic, null, x.getTimestamp(), null, x.toString())) .collect(Collectors.toList()); - records.stream().map((r) -> producer.send(r)).parallel().forEach((f) -> { + boolean result = records.stream().map((r) -> producer.send(r)).parallel().map((f) -> { try { f.get(); + return true; } catch (InterruptedException e) { e.printStackTrace(); + return false; } catch (ExecutionException e) { e.printStackTrace(); + return false; } - }); + }).reduce(true, (x, y) -> x && y); + + return result; } } diff --git a/src/main/java/eu/fasten/crawler/output/Output.java b/src/main/java/eu/fasten/crawler/output/Output.java index 9cb2060..6f7493d 100644 --- a/src/main/java/eu/fasten/crawler/output/Output.java +++ b/src/main/java/eu/fasten/crawler/output/Output.java @@ -8,13 +8,13 @@ public interface Output { /** Helper methods for constructing and cleaning up the output instance. **/ - void open(); - void close(); - void flush(); + default void open() {} + default void close() {} + default void flush() {} /** Send records to output. **/ - default void send(MavenArtifact artifact) { - send(Arrays.asList(artifact)); + default boolean send(MavenArtifact artifact) { + return send(Arrays.asList(artifact)); } - void send(List artifact); + boolean send(List artifact); } diff --git a/src/main/java/eu/fasten/crawler/output/RestOutput.java b/src/main/java/eu/fasten/crawler/output/RestOutput.java new file mode 100644 index 0000000..4c16c9d --- /dev/null +++ b/src/main/java/eu/fasten/crawler/output/RestOutput.java @@ -0,0 +1,42 @@ +package eu.fasten.crawler.output; + +import eu.fasten.crawler.data.MavenArtifact; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.HttpClients; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +public class RestOutput implements Output { + + private Logger logger = LoggerFactory.getLogger(this.getClass()); + + private String endpoint; + + public RestOutput(String endpoint) { + this.endpoint = endpoint; + } + + @Override + public boolean send(List artifact) { + HttpClient httpcClient = HttpClients.createDefault(); + HttpPost httpPost = new HttpPost(endpoint); + + try { + int responseCode = httpcClient.execute(httpPost).getStatusLine().getStatusCode(); + + if (responseCode == 200) { + return true; + } else { + return false; + } + } catch (IOException e) { + logger.error("Failed sending to rest endpoint. ", e); + return false; + } + } +} diff --git a/src/main/java/eu/fasten/crawler/output/StdOutput.java b/src/main/java/eu/fasten/crawler/output/StdOutput.java index 029a67e..5644c49 100644 --- a/src/main/java/eu/fasten/crawler/output/StdOutput.java +++ b/src/main/java/eu/fasten/crawler/output/StdOutput.java @@ -6,21 +6,13 @@ public class StdOutput implements Output { - @Override - public void open() {} - - @Override - public void close() {} - - @Override - public void flush() {} - /** * Prints the artifacts to the screen. * @param artifact list of artifacts. */ @Override - public void send(List artifact) { + public boolean send(List artifact) { artifact.stream().map((a) -> a.toString()).forEach((a) -> {System.out.println(a);}); + return true; } } From ad1f1dff6b91ca47fec8154af7b8d16cfa25fc65 Mon Sep 17 00:00:00 2001 From: Wouter Zorgdrager Date: Fri, 5 Feb 2021 12:03:52 +0100 Subject: [PATCH 02/10] Fix tests --- src/main/java/eu/fasten/crawler/output/RestOutput.java | 2 +- .../java/eu/fasten/crawler/IncrementalMavenCrawlerTest.java | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/java/eu/fasten/crawler/output/RestOutput.java b/src/main/java/eu/fasten/crawler/output/RestOutput.java index 4c16c9d..bbaf85f 100644 --- a/src/main/java/eu/fasten/crawler/output/RestOutput.java +++ b/src/main/java/eu/fasten/crawler/output/RestOutput.java @@ -16,7 +16,7 @@ public class RestOutput implements Output { private Logger logger = LoggerFactory.getLogger(this.getClass()); private String endpoint; - + public RestOutput(String endpoint) { this.endpoint = endpoint; } diff --git a/src/test/java/eu/fasten/crawler/IncrementalMavenCrawlerTest.java b/src/test/java/eu/fasten/crawler/IncrementalMavenCrawlerTest.java index 9d5e3ae..3d69fae 100644 --- a/src/test/java/eu/fasten/crawler/IncrementalMavenCrawlerTest.java +++ b/src/test/java/eu/fasten/crawler/IncrementalMavenCrawlerTest.java @@ -138,8 +138,7 @@ public void testSuccessfulCrawl() { IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 50, stdOutput, "src/test/resources/"); - doNothing().when(stdOutput).send(anyList()); - + when(stdOutput.send(anyList())).thenReturn(true); crawler.run(); assertTrue(new File("src/test/resources/" + (index + 1) + ".index").exists()); From 232388b580877333519f536ecc03d9b269190d37 Mon Sep 17 00:00:00 2001 From: Wouter Zorgdrager Date: Fri, 5 Feb 2021 13:58:28 +0100 Subject: [PATCH 03/10] Some refactoring. --- .../eu/fasten/crawler/output/KafkaOutput.java | 2 +- .../java/eu/fasten/crawler/output/Output.java | 1 + .../eu/fasten/crawler/output/RestOutput.java | 39 +++++++++++++++++-- .../eu/fasten/crawler/CrawlIndexTest.java | 25 ++++++++++-- .../crawler/IncrementalMavenCrawlerTest.java | 14 +++++++ 5 files changed, 74 insertions(+), 7 deletions(-) diff --git a/src/main/java/eu/fasten/crawler/output/KafkaOutput.java b/src/main/java/eu/fasten/crawler/output/KafkaOutput.java index d6599e3..2607be1 100644 --- a/src/main/java/eu/fasten/crawler/output/KafkaOutput.java +++ b/src/main/java/eu/fasten/crawler/output/KafkaOutput.java @@ -70,7 +70,7 @@ public boolean send(List artifact) { e.printStackTrace(); return false; } - }).reduce(true, (x, y) -> x && y); + }).noneMatch((x) -> x == false); return result; } diff --git a/src/main/java/eu/fasten/crawler/output/Output.java b/src/main/java/eu/fasten/crawler/output/Output.java index 6f7493d..71aa1ed 100644 --- a/src/main/java/eu/fasten/crawler/output/Output.java +++ b/src/main/java/eu/fasten/crawler/output/Output.java @@ -16,5 +16,6 @@ default void flush() {} default boolean send(MavenArtifact artifact) { return send(Arrays.asList(artifact)); } + boolean send(List artifact); } diff --git a/src/main/java/eu/fasten/crawler/output/RestOutput.java b/src/main/java/eu/fasten/crawler/output/RestOutput.java index bbaf85f..966df67 100644 --- a/src/main/java/eu/fasten/crawler/output/RestOutput.java +++ b/src/main/java/eu/fasten/crawler/output/RestOutput.java @@ -4,11 +4,13 @@ import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.HttpClients; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.util.List; public class RestOutput implements Output { @@ -17,21 +19,44 @@ public class RestOutput implements Output { private String endpoint; + /** + * Setup RestOutput. + * @param endpoint the http url to POST to. + */ public RestOutput(String endpoint) { this.endpoint = endpoint; } @Override public boolean send(List artifact) { - HttpClient httpcClient = HttpClients.createDefault(); - HttpPost httpPost = new HttpPost(endpoint); + // Setup connections. + HttpClient httpClient = HttpClients.createDefault(); + HttpPost httpPost = constructPostRequest(); + + + // Build array of JSON objects. + StringBuffer json = new StringBuffer(); + json.append("["); + + for (MavenArtifact af : artifact) { + json.append(af.toString() + ","); + } + + json.deleteCharAt(json.length() - 1); + json.append("]"); + try { - int responseCode = httpcClient.execute(httpPost).getStatusLine().getStatusCode(); + // Send batch. + StringEntity jsonList = new StringEntity(json.toString()); + httpPost.setEntity(jsonList); + int responseCode = httpClient.execute(httpPost).getStatusLine().getStatusCode(); + // If we don't get a 200, return false. if (responseCode == 200) { return true; } else { + logger.warn("Expected response 200, but got " + responseCode); return false; } } catch (IOException e) { @@ -39,4 +64,12 @@ public boolean send(List artifact) { return false; } } + + /** + * Constructs a PostRequest. + * @return a HTTPPost. + */ + public HttpPost constructPostRequest() { + return new HttpPost(this.endpoint); + } } diff --git a/src/test/java/eu/fasten/crawler/CrawlIndexTest.java b/src/test/java/eu/fasten/crawler/CrawlIndexTest.java index c5f6028..e19e13c 100644 --- a/src/test/java/eu/fasten/crawler/CrawlIndexTest.java +++ b/src/test/java/eu/fasten/crawler/CrawlIndexTest.java @@ -12,7 +12,7 @@ import java.util.List; import java.util.concurrent.Future; -import static org.junit.Assert.assertThrows; +import static org.junit.Assert.*; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyList; import static org.mockito.Mockito.*; @@ -38,12 +38,30 @@ public void testIndexSetupFullRun() { CrawlIndex index = new CrawlIndex(600, f); StdOutput mockStd = mock(StdOutput.class); - index.crawlAndSend(mockStd, 50); + when(mockStd.send(anyList())).thenReturn(true); + + boolean res = index.crawlAndSend(mockStd, 50); + + verify(mockStd, atLeastOnce()).send(anyList()); + assertTrue(res); + f.delete(); + } + + @Test + public void testIndexSetupFullRunFailure() { + File f = DownloadIndex.download(600); + CrawlIndex index = new CrawlIndex(600, f); + StdOutput mockStd = mock(StdOutput.class); + + when(mockStd.send(anyList())).thenReturn(false); + boolean res = index.crawlAndSend(mockStd, 50); verify(mockStd, atLeastOnce()).send(anyList()); + assertFalse(res); f.delete(); } + @Test public void testIndexSetupFullRunKafka() throws IllegalAccessException { File f = DownloadIndex.download(600); @@ -58,9 +76,10 @@ public void testIndexSetupFullRunKafka() throws IllegalAccessException { FieldUtils.writeField(kafkaOutput, "producer", prod, true); - index.crawlAndSend(kafkaOutput, 50); + boolean res = index.crawlAndSend(kafkaOutput, 50); verify(kafkaOutput, atLeastOnce()).send(anyList()); + assertTrue(res); f.delete(); } } diff --git a/src/test/java/eu/fasten/crawler/IncrementalMavenCrawlerTest.java b/src/test/java/eu/fasten/crawler/IncrementalMavenCrawlerTest.java index 3d69fae..d9d53cf 100644 --- a/src/test/java/eu/fasten/crawler/IncrementalMavenCrawlerTest.java +++ b/src/test/java/eu/fasten/crawler/IncrementalMavenCrawlerTest.java @@ -147,4 +147,18 @@ public void testSuccessfulCrawl() { new File("src/test/resources/" + (index + 1) + ".index").delete(); } + + @Test + public void testFailedCrawl() { + int index = 680; + StdOutput stdOutput = spy(new StdOutput()); + + IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 50, stdOutput, "src/test/resources/"); + + when(stdOutput.send(anyList())).thenReturn(false); + crawler.run(); + + verify(stdOutput, atLeastOnce()).send(anyList()); + assertEquals(index, crawler.getIndex()); + } } From cddae76e6b69196a1ff8f2d925c012414a02581a Mon Sep 17 00:00:00 2001 From: Wouter Zorgdrager Date: Fri, 5 Feb 2021 14:11:37 +0100 Subject: [PATCH 04/10] Specific Kafka tests --- .../crawler/output/KafkaOutputTest.java | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 src/test/java/eu/fasten/crawler/output/KafkaOutputTest.java diff --git a/src/test/java/eu/fasten/crawler/output/KafkaOutputTest.java b/src/test/java/eu/fasten/crawler/output/KafkaOutputTest.java new file mode 100644 index 0000000..2d90df1 --- /dev/null +++ b/src/test/java/eu/fasten/crawler/output/KafkaOutputTest.java @@ -0,0 +1,63 @@ +package eu.fasten.crawler.output; + +import eu.fasten.crawler.data.MavenArtifact; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.junit.Test; +import org.mockito.Spy; + +import java.util.List; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doReturn; + +public class KafkaOutputTest { + + @Test + public void testKafkaOutputSuccessfulSend() throws IllegalAccessException { + MavenArtifact artifactOne = new MavenArtifact("a", "g", "1", 0L); + MavenArtifact artifactTwo = new MavenArtifact("a", "g", "2", 0L); + + KafkaOutput kafkaOutput = spy(new KafkaOutput("", "", 50)); + + KafkaProducer prod = mock(KafkaProducer.class); + Future fut = mock(Future.class); + + doNothing().when(kafkaOutput).open(); + doReturn(fut).when(prod).send(any()); + + FieldUtils.writeField(kafkaOutput, "producer", prod, true); + + boolean res = kafkaOutput.send(List.of(artifactOne, artifactTwo)); + assertTrue(res); + } + + @Test + public void testKafkaOutputFailedSend() throws Exception { + MavenArtifact artifactOne = new MavenArtifact("a", "g", "1", 0L); + MavenArtifact artifactTwo = new MavenArtifact("a", "g", "2", 0L); + + KafkaOutput kafkaOutput = spy(new KafkaOutput("", "", 50)); + + KafkaProducer prod = mock(KafkaProducer.class); + Future fut = mock(Future.class); + Future futTwo = mock(Future.class); + + doNothing().when(kafkaOutput).open(); + doReturn(fut).when(prod).send(new ProducerRecord("", null, artifactOne.getTimestamp(), null, artifactOne.toString())); + doReturn(futTwo).when(prod).send(new ProducerRecord("", null, artifactTwo.getTimestamp(), null, artifactTwo.toString())); + + when(futTwo.get()).thenThrow(InterruptedException.class); + + FieldUtils.writeField(kafkaOutput, "producer", prod, true); + + boolean res = kafkaOutput.send(List.of(artifactOne, artifactTwo)); + assertFalse(res); + } +} From c07e15101c32f3bc4999848402cd231c554e5820 Mon Sep 17 00:00:00 2001 From: Wouter Zorgdrager Date: Fri, 5 Feb 2021 14:14:19 +0100 Subject: [PATCH 05/10] Move some code around --- .../java/eu/fasten/crawler/CrawlIndex.java | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/src/main/java/eu/fasten/crawler/CrawlIndex.java b/src/main/java/eu/fasten/crawler/CrawlIndex.java index 8b8a1f5..ea157a2 100644 --- a/src/main/java/eu/fasten/crawler/CrawlIndex.java +++ b/src/main/java/eu/fasten/crawler/CrawlIndex.java @@ -101,24 +101,12 @@ public List getIndexCreators() */ public boolean crawlAndSend(Output output, int batchSize) { nonUnique = 0; - Set artifactSet = new HashSet<>(); + HashSet artifactSet = new HashSet<>(); // Setup output. output.open(); - IndexDataReader.IndexDataReadVisitor visitor = (doc) -> { - MavenArtifact artifact = MavenArtifact.fromDocument(doc, context); - if (artifact == null) { - logger.warn("Couldn't construct artifact info for document: " + doc.toString() + ". We will skip it."); - return; - } - - if (artifactSet.contains(artifact)) { - nonUnique += 1; - } else { - artifactSet.add(artifact); - } - }; + IndexDataReader.IndexDataReadVisitor visitor = setupUniqueVisitor(artifactSet); try { IndexDataReader.IndexDataReadResult result = reader.readIndex(visitor, context); @@ -152,5 +140,23 @@ public boolean crawlAndSend(Output output, int batchSize) { return true; } + + public IndexDataReader.IndexDataReadVisitor setupUniqueVisitor(HashSet artifactSet) { + IndexDataReader.IndexDataReadVisitor visitor = (doc) -> { + MavenArtifact artifact = MavenArtifact.fromDocument(doc, context); + if (artifact == null) { + logger.warn("Couldn't construct artifact info for document: " + doc.toString() + ". We will skip it."); + return; + } + + if (artifactSet.contains(artifact)) { + nonUnique += 1; + } else { + artifactSet.add(artifact); + } + }; + + return visitor; + } } From 33c9703287c984222415f647fc7232132661ec93 Mon Sep 17 00:00:00 2001 From: Wouter Zorgdrager Date: Fri, 5 Feb 2021 14:41:46 +0100 Subject: [PATCH 06/10] Finalize REST --- README.md | 7 +- .../java/eu/fasten/crawler/CrawlIndex.java | 7 +- .../crawler/IncrementalMavenCrawler.java | 18 ++++ .../eu/fasten/crawler/output/RestOutput.java | 49 ++++++---- .../fasten/crawler/output/RestOutputTest.java | 98 +++++++++++++++++++ 5 files changed, 160 insertions(+), 19 deletions(-) create mode 100644 src/test/java/eu/fasten/crawler/output/RestOutputTest.java diff --git a/README.md b/README.md index c004ca3..74846e9 100644 --- a/README.md +++ b/README.md @@ -20,8 +20,13 @@ usage: IncrementalMavenCrawler crawled index. Used for recovery on crash or restart. Optional. -kb,--kafka_brokers Kafka brokers to connect with. I.e. - broker1:port,broker2:port,... Optional. + broker1:port,broker2:port,... + Required for Kafka output. -kt,--kafka_topic Kafka topic to produce to. + Required for Kafka output. + -re,--rest_endpoint HTTP endpoint to post crawled batches to. + Required for Rest output. + ``` ### Outputs diff --git a/src/main/java/eu/fasten/crawler/CrawlIndex.java b/src/main/java/eu/fasten/crawler/CrawlIndex.java index ea157a2..ca012ec 100644 --- a/src/main/java/eu/fasten/crawler/CrawlIndex.java +++ b/src/main/java/eu/fasten/crawler/CrawlIndex.java @@ -140,7 +140,12 @@ public boolean crawlAndSend(Output output, int batchSize) { return true; } - + + /** + * Setup DataReadVisitor which fills a set with unique artifacts. + * @param artifactSet reference to the set which will be filled with unique artifacts. + * @return the visitor. + */ public IndexDataReader.IndexDataReadVisitor setupUniqueVisitor(HashSet artifactSet) { IndexDataReader.IndexDataReadVisitor visitor = (doc) -> { MavenArtifact artifact = MavenArtifact.fromDocument(doc, context); diff --git a/src/main/java/eu/fasten/crawler/IncrementalMavenCrawler.java b/src/main/java/eu/fasten/crawler/IncrementalMavenCrawler.java index 749a0f6..61b189a 100644 --- a/src/main/java/eu/fasten/crawler/IncrementalMavenCrawler.java +++ b/src/main/java/eu/fasten/crawler/IncrementalMavenCrawler.java @@ -2,6 +2,7 @@ import eu.fasten.crawler.output.KafkaOutput; import eu.fasten.crawler.output.Output; +import eu.fasten.crawler.output.RestOutput; import eu.fasten.crawler.output.StdOutput; import org.apache.commons.cli.*; import org.codehaus.plexus.util.FileUtils; @@ -76,6 +77,13 @@ public class IncrementalMavenCrawler implements Runnable { .desc("Kafka brokers to connect with. I.e. broker1:port,broker2:port,... Optional.") .build(); + static Option optRestEndpoint = Option.builder("re") + .longOpt("rest_endpoint") + .hasArg() + .argName("url") + .desc("HTTP endpoint to post crawled batches to.") + .build(); + public static void main(String[] args) { options.addOption(optStartIndex); options.addOption(optBatchSize); @@ -84,6 +92,7 @@ public static void main(String[] args) { options.addOption(optCheckpointDir); options.addOption(optKafkaTopic); options.addOption(optKafkaBrokers); + options.addOption(optRestEndpoint); CommandLineParser parser = new DefaultParser(); @@ -105,6 +114,8 @@ public static void main(String[] args) { // Setup Kafka. if (properties.get("output").equals("kafka")) { output = new KafkaOutput(properties.getProperty("kafka_topic"), properties.getProperty("kafka_brokers"), batchSize); + } else if (properties.get("output").equals("rest")) { + output = new RestOutput(properties.getProperty("rest_endpoint")); } // Start cralwer and execute it with an interval. @@ -126,6 +137,10 @@ public static Properties verifyAndParseArguments(CommandLine cmd) throws ParseEx throw new ParseException("Configured output to be Kafka, but no `kafka_topic` or `kafka_brokers` have been configured."); } + if (cmd.getOptionValue("output").equals("rest") && !(cmd.hasOption("rest_endpoint"))) { + throw new ParseException("Configured output to be Rest, but no `rest_endpoint` has been configured."); + } + props.setProperty("index", cmd.getOptionValue("start_index", "0")); props.setProperty("batch_size", cmd.getOptionValue("batch_size", "50")); props.setProperty("output", cmd.getOptionValue("output", "std")); @@ -133,6 +148,7 @@ public static Properties verifyAndParseArguments(CommandLine cmd) throws ParseEx props.setProperty("checkpoint_dir", cmd.getOptionValue("checkpoint_dir", "")); props.setProperty("kafka_topic", cmd.getOptionValue("kafka_topic", "")); props.setProperty("kafka_brokers", cmd.getOptionValue("kafka_brokers", "")); + props.setProperty("rest_endpoint", cmd.getOptionValue("rest_endpoint", "")); return props; } @@ -163,6 +179,8 @@ public IncrementalMavenCrawler(int startIndex, int batchSize, Output output, Str if (this.index > startIndex) { logger.info("Found (checkpointed) index in " + checkpointDir + ". Will start crawling from index " + this.index); } + + logger.info("Starting IncrementalMavenCrawler with index: " + this.index + ", batch size: " + batchSize + " and output " + output.getClass().getSimpleName() + "."); } /** diff --git a/src/main/java/eu/fasten/crawler/output/RestOutput.java b/src/main/java/eu/fasten/crawler/output/RestOutput.java index 966df67..79086e4 100644 --- a/src/main/java/eu/fasten/crawler/output/RestOutput.java +++ b/src/main/java/eu/fasten/crawler/output/RestOutput.java @@ -28,27 +28,14 @@ public RestOutput(String endpoint) { } @Override - public boolean send(List artifact) { + public boolean send(List artifacts) { // Setup connections. - HttpClient httpClient = HttpClients.createDefault(); + HttpClient httpClient = constructHttpClient(); HttpPost httpPost = constructPostRequest(); - - // Build array of JSON objects. - StringBuffer json = new StringBuffer(); - json.append("["); - - for (MavenArtifact af : artifact) { - json.append(af.toString() + ","); - } - - json.deleteCharAt(json.length() - 1); - json.append("]"); - - try { // Send batch. - StringEntity jsonList = new StringEntity(json.toString()); + StringEntity jsonList = new StringEntity(buildJsonList(artifacts)); httpPost.setEntity(jsonList); int responseCode = httpClient.execute(httpPost).getStatusLine().getStatusCode(); @@ -56,7 +43,7 @@ public boolean send(List artifact) { if (responseCode == 200) { return true; } else { - logger.warn("Expected response 200, but got " + responseCode); + logger.error("Expected response 200, but got " + responseCode); return false; } } catch (IOException e) { @@ -65,6 +52,26 @@ public boolean send(List artifact) { } } + /** + * Builds a json list of all artifacts. + * @param artifacts all artifacts. + * @return a stringified list of all artifacts. + */ + public String buildJsonList(List artifacts) { + // Build array of JSON objects. + StringBuffer json = new StringBuffer(); + json.append("["); + + for (MavenArtifact af : artifacts) { + json.append(af.toString() + ","); + } + + json.deleteCharAt(json.length() - 1); + json.append("]"); + + return json.toString(); + } + /** * Constructs a PostRequest. * @return a HTTPPost. @@ -72,4 +79,12 @@ public boolean send(List artifact) { public HttpPost constructPostRequest() { return new HttpPost(this.endpoint); } + + /** + * Constructs a HTTPClient. + * @return a HttpClient. + */ + public HttpClient constructHttpClient() { + return HttpClients.createDefault(); + } } diff --git a/src/test/java/eu/fasten/crawler/output/RestOutputTest.java b/src/test/java/eu/fasten/crawler/output/RestOutputTest.java new file mode 100644 index 0000000..33b3e03 --- /dev/null +++ b/src/test/java/eu/fasten/crawler/output/RestOutputTest.java @@ -0,0 +1,98 @@ +package eu.fasten.crawler.output; + +import eu.fasten.crawler.data.MavenArtifact; +import org.apache.http.HttpResponse; +import org.apache.http.StatusLine; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.HttpClients; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +public class RestOutputTest { + + @Test + public void testRestOutputFailed() throws Exception { + RestOutput output = spy(new RestOutput("")); + + HttpClient client = mock(HttpClient.class); + HttpPost post = mock(HttpPost.class); + + when(output.constructHttpClient()).thenReturn(client); + when(output.constructPostRequest()).thenReturn(post); + + when(client.execute(post)).thenThrow(IOException.class); + boolean res = output.send(new MavenArtifact("a", "b", "c", 0L)); + + assertFalse(res); + } + + @Test + public void testRestOutputFailedStatus() throws Exception { + RestOutput output = spy(new RestOutput("")); + + HttpClient client = mock(HttpClient.class); + HttpPost post = mock(HttpPost.class); + HttpResponse response = mock(HttpResponse.class); + StatusLine line = mock(StatusLine.class); + + when(output.constructHttpClient()).thenReturn(client); + when(output.constructPostRequest()).thenReturn(post); + when(client.execute(post)).thenReturn(response); + when(response.getStatusLine()).thenReturn(line); + when(line.getStatusCode()).thenReturn(201); + + boolean res = output.send(new MavenArtifact("a", "b", "c", 0L)); + + assertFalse(res); + } + + @Test + public void testRestOutputSuccessStatus() throws Exception { + RestOutput output = spy(new RestOutput("")); + + HttpClient client = mock(HttpClient.class); + HttpPost post = mock(HttpPost.class); + HttpResponse response = mock(HttpResponse.class); + StatusLine line = mock(StatusLine.class); + + when(output.constructHttpClient()).thenReturn(client); + when(output.constructPostRequest()).thenReturn(post); + when(client.execute(post)).thenReturn(response); + when(response.getStatusLine()).thenReturn(line); + when(line.getStatusCode()).thenReturn(200); + + boolean res = output.send(new MavenArtifact("a", "b", "c", 0L)); + + assertTrue(res); + } + + @Test + public void testVerifyJSON() throws Exception { + RestOutput output = spy(new RestOutput("")); + MavenArtifact af = new MavenArtifact("a", "b", "c", 0L); + String list = output.buildJsonList(List.of(af)); + + + assertEquals("[" + af.toString() + "]", list); + } + + @Test + public void testVerifyJSONList() throws Exception { + RestOutput output = spy(new RestOutput("")); + MavenArtifact af = new MavenArtifact("a", "b", "c", 0L); + MavenArtifact af2 = new MavenArtifact("a", "b", "d", 0L); + String list = output.buildJsonList(List.of(af, af2)); + + + System.out.println(list); + + assertEquals("[" + af.toString() + "," + af2 + "]", list); + } +} From c78d6690e98550da45fd2d189f3a7c67f6b493c7 Mon Sep 17 00:00:00 2001 From: Wouter Zorgdrager Date: Fri, 5 Feb 2021 14:43:50 +0100 Subject: [PATCH 07/10] Update IncrementalMavenCrawler.java --- .../crawler/IncrementalMavenCrawler.java | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/main/java/eu/fasten/crawler/IncrementalMavenCrawler.java b/src/main/java/eu/fasten/crawler/IncrementalMavenCrawler.java index 61b189a..47fface 100644 --- a/src/main/java/eu/fasten/crawler/IncrementalMavenCrawler.java +++ b/src/main/java/eu/fasten/crawler/IncrementalMavenCrawler.java @@ -85,15 +85,7 @@ public class IncrementalMavenCrawler implements Runnable { .build(); public static void main(String[] args) { - options.addOption(optStartIndex); - options.addOption(optBatchSize); - options.addOption(optOutputType); - options.addOption(optCrawlInterval); - options.addOption(optCheckpointDir); - options.addOption(optKafkaTopic); - options.addOption(optKafkaBrokers); - options.addOption(optRestEndpoint); - + addOptions(); CommandLineParser parser = new DefaultParser(); Properties properties; @@ -124,6 +116,17 @@ public static void main(String[] args) { service.scheduleAtFixedRate(crawler, 0, interval, TimeUnit.HOURS); } + public static void addOptions() { + options.addOption(optStartIndex); + options.addOption(optBatchSize); + options.addOption(optOutputType); + options.addOption(optCrawlInterval); + options.addOption(optCheckpointDir); + options.addOption(optKafkaTopic); + options.addOption(optKafkaBrokers); + options.addOption(optRestEndpoint); + } + /** * Verify and stores arguments in properties instance. * @param cmd the parsed command line arguments. From df9b4cfae1442d534a3a50284c941399cbe04313 Mon Sep 17 00:00:00 2001 From: Wouter Zorgdrager Date: Fri, 5 Feb 2021 14:54:18 +0100 Subject: [PATCH 08/10] Update IncrementalMavenCrawlerTest.java --- .../crawler/IncrementalMavenCrawlerTest.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/test/java/eu/fasten/crawler/IncrementalMavenCrawlerTest.java b/src/test/java/eu/fasten/crawler/IncrementalMavenCrawlerTest.java index d9d53cf..7070cee 100644 --- a/src/test/java/eu/fasten/crawler/IncrementalMavenCrawlerTest.java +++ b/src/test/java/eu/fasten/crawler/IncrementalMavenCrawlerTest.java @@ -22,7 +22,7 @@ public static void beforeAll() { @Test public void testCheckpointDisabled() { int index = 0; - IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, null, ""); + IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, new StdOutput(), ""); assertEquals(index, crawler.getIndex()); } @@ -34,7 +34,7 @@ public void testCheckpointEnabledNoOverride() throws IOException { file.mkdirs(); file.createNewFile(); - IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, null, "src/test/resources/"); + IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, new StdOutput(), "src/test/resources/"); assertEquals(index, crawler.getIndex()); file.delete(); } @@ -47,7 +47,7 @@ public void testCheckpointEnabledOverride() throws IOException { file.mkdirs(); file.createNewFile(); - IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, null, "src/test/resources/"); + IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, new StdOutput(), "src/test/resources/"); assertNotEquals(index, crawler.getIndex()); assertEquals(1, crawler.getIndex()); file.delete(); @@ -65,7 +65,7 @@ public void testCheckpointEnabledMultipleOverride() throws IOException { file2.mkdirs(); file2.createNewFile(); - IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, null, "src/test/resources/"); + IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, new StdOutput(), "src/test/resources/"); assertNotEquals(index, crawler.getIndex()); assertEquals(5, crawler.getIndex()); @@ -79,7 +79,7 @@ public void testUpdateIndexNoCheckpoint() throws IOException { File file = new File("src/test/resources/1.index"); - IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, null, ""); + IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, new StdOutput(), ""); crawler.updateIndex(); assertFalse(file.exists()); @@ -91,7 +91,7 @@ public void testUpdateIndexCheckpoint() throws IOException { int index = 0; - IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, null, "src/test/resources/"); + IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, new StdOutput(), "src/test/resources/"); crawler.updateIndex(); File file = new File("src/test/resources/1.index"); @@ -113,7 +113,7 @@ public void testUpdateIndexCheckpointEnabledMultipleOverride() throws IOExceptio file2.mkdirs(); file2.createNewFile(); - IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, null, "src/test/resources/"); + IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, new StdOutput(), "src/test/resources/"); crawler.updateIndex(); assertFalse(file.exists()); assertFalse(file2.exists()); @@ -126,7 +126,7 @@ public void testUpdateIndexCheckpointEnabledMultipleOverride() throws IOExceptio @Test public void testNonExistentIndex() { int index = 9999999; - IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, null, "src/test/resources/"); + IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, new StdOutput(), "src/test/resources/"); crawler.run(); assertEquals(index, crawler.getIndex()); } From 4e7ef38f0f4a194dc027c85f20c108248aa0af7e Mon Sep 17 00:00:00 2001 From: Wouter Zorgdrager Date: Fri, 5 Feb 2021 14:57:50 +0100 Subject: [PATCH 09/10] Remove another MavenArtifact dependency --- .../java/eu/fasten/crawler/output/RestOutput.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/java/eu/fasten/crawler/output/RestOutput.java b/src/main/java/eu/fasten/crawler/output/RestOutput.java index 79086e4..badef66 100644 --- a/src/main/java/eu/fasten/crawler/output/RestOutput.java +++ b/src/main/java/eu/fasten/crawler/output/RestOutput.java @@ -12,6 +12,7 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.List; +import java.util.stream.Collectors; public class RestOutput implements Output { @@ -35,7 +36,7 @@ public boolean send(List artifacts) { try { // Send batch. - StringEntity jsonList = new StringEntity(buildJsonList(artifacts)); + StringEntity jsonList = new StringEntity(buildJsonList(artifacts.stream().map((x) -> x.toString()).collect(Collectors.toList()))); httpPost.setEntity(jsonList); int responseCode = httpClient.execute(httpPost).getStatusLine().getStatusCode(); @@ -54,16 +55,16 @@ public boolean send(List artifacts) { /** * Builds a json list of all artifacts. - * @param artifacts all artifacts. + * @param artifactStrings all artifacts. * @return a stringified list of all artifacts. */ - public String buildJsonList(List artifacts) { + public String buildJsonList(List artifactStrings) { // Build array of JSON objects. StringBuffer json = new StringBuffer(); json.append("["); - for (MavenArtifact af : artifacts) { - json.append(af.toString() + ","); + for (String af : artifactStrings) { + json.append(af + ","); } json.deleteCharAt(json.length() - 1); From 885f94525e80f6fbad3eb2690d5a750ef07409b1 Mon Sep 17 00:00:00 2001 From: Wouter Zorgdrager Date: Fri, 5 Feb 2021 15:07:22 +0100 Subject: [PATCH 10/10] Move to outputfactor --- .../fasten/crawler/IncrementalMavenCrawler.java | 14 ++------------ .../eu/fasten/crawler/output/OutputFactory.java | 17 +++++++++++++++++ .../eu/fasten/crawler/output/RestOutput.java | 11 +++++------ 3 files changed, 24 insertions(+), 18 deletions(-) create mode 100644 src/main/java/eu/fasten/crawler/output/OutputFactory.java diff --git a/src/main/java/eu/fasten/crawler/IncrementalMavenCrawler.java b/src/main/java/eu/fasten/crawler/IncrementalMavenCrawler.java index 47fface..c971b9c 100644 --- a/src/main/java/eu/fasten/crawler/IncrementalMavenCrawler.java +++ b/src/main/java/eu/fasten/crawler/IncrementalMavenCrawler.java @@ -1,9 +1,6 @@ package eu.fasten.crawler; -import eu.fasten.crawler.output.KafkaOutput; -import eu.fasten.crawler.output.Output; -import eu.fasten.crawler.output.RestOutput; -import eu.fasten.crawler.output.StdOutput; +import eu.fasten.crawler.output.*; import org.apache.commons.cli.*; import org.codehaus.plexus.util.FileUtils; import org.slf4j.Logger; @@ -97,18 +94,11 @@ public static void main(String[] args) { } // Setup arguments for crawler. - Output output = new StdOutput(); int batchSize = Integer.parseInt(properties.getProperty("batch_size")); int startIndex = Integer.parseInt(properties.getProperty("index")); int interval = Integer.parseInt(properties.getProperty("interval")); String checkpointDir = properties.getProperty("checkpoint_dir"); - - // Setup Kafka. - if (properties.get("output").equals("kafka")) { - output = new KafkaOutput(properties.getProperty("kafka_topic"), properties.getProperty("kafka_brokers"), batchSize); - } else if (properties.get("output").equals("rest")) { - output = new RestOutput(properties.getProperty("rest_endpoint")); - } + Output output = OutputFactory.getOutput(properties.getProperty("output"), properties); // Start cralwer and execute it with an interval. IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(startIndex, batchSize, output, checkpointDir); diff --git a/src/main/java/eu/fasten/crawler/output/OutputFactory.java b/src/main/java/eu/fasten/crawler/output/OutputFactory.java new file mode 100644 index 0000000..155358d --- /dev/null +++ b/src/main/java/eu/fasten/crawler/output/OutputFactory.java @@ -0,0 +1,17 @@ +package eu.fasten.crawler.output; + +import java.util.Properties; + +public class OutputFactory { + + public static Output getOutput(String outputName, Properties properties) { + switch (outputName) { + case "kafka": + return new KafkaOutput(properties.getProperty("kafka_topic"), properties.getProperty("kafka_brokers"), Integer.parseInt(properties.getProperty("batch_size"))); + case "rest": + return new RestOutput(properties.getProperty("rest_endpoint")); + default: + return new StdOutput(); + } + } +} diff --git a/src/main/java/eu/fasten/crawler/output/RestOutput.java b/src/main/java/eu/fasten/crawler/output/RestOutput.java index badef66..79086e4 100644 --- a/src/main/java/eu/fasten/crawler/output/RestOutput.java +++ b/src/main/java/eu/fasten/crawler/output/RestOutput.java @@ -12,7 +12,6 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.List; -import java.util.stream.Collectors; public class RestOutput implements Output { @@ -36,7 +35,7 @@ public boolean send(List artifacts) { try { // Send batch. - StringEntity jsonList = new StringEntity(buildJsonList(artifacts.stream().map((x) -> x.toString()).collect(Collectors.toList()))); + StringEntity jsonList = new StringEntity(buildJsonList(artifacts)); httpPost.setEntity(jsonList); int responseCode = httpClient.execute(httpPost).getStatusLine().getStatusCode(); @@ -55,16 +54,16 @@ public boolean send(List artifacts) { /** * Builds a json list of all artifacts. - * @param artifactStrings all artifacts. + * @param artifacts all artifacts. * @return a stringified list of all artifacts. */ - public String buildJsonList(List artifactStrings) { + public String buildJsonList(List artifacts) { // Build array of JSON objects. StringBuffer json = new StringBuffer(); json.append("["); - for (String af : artifactStrings) { - json.append(af + ","); + for (MavenArtifact af : artifacts) { + json.append(af.toString() + ","); } json.deleteCharAt(json.length() - 1);