diff --git a/analyzer/debian-license-feeder/src/main/java/eu/fasten/analyzer/debianlicensefeeder/DebianLicenseFeederPlugin.java b/analyzer/debian-license-feeder/src/main/java/eu/fasten/analyzer/debianlicensefeeder/DebianLicenseFeederPlugin.java index 68f3198ff..636cca9eb 100644 --- a/analyzer/debian-license-feeder/src/main/java/eu/fasten/analyzer/debianlicensefeeder/DebianLicenseFeederPlugin.java +++ b/analyzer/debian-license-feeder/src/main/java/eu/fasten/analyzer/debianlicensefeeder/DebianLicenseFeederPlugin.java @@ -2,9 +2,6 @@ import eu.fasten.core.data.Constants; import eu.fasten.core.data.metadatadb.MetadataDao; -//This import should probably be different -import eu.fasten.core.maven.data.Revision; -import eu.fasten.core.plugins.AbstractKafkaPlugin; import eu.fasten.core.plugins.DBConnector; import eu.fasten.core.plugins.KafkaPlugin; import org.jooq.DSLContext; @@ -17,8 +14,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.Timestamp; -import java.util.*; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +//This import should probably be different public class DebianLicenseFeederPlugin extends Plugin { @@ -82,7 +84,7 @@ public void consume(String record) { } catch (Exception e) { // Fasten error-handling guidelines logger.error(e.getMessage(), e.getCause()); - setPluginError(e); + throw e; } } diff --git a/analyzer/license-feeder/src/main/java/eu/fasten/analyzer/licensefeeder/LicenseFeederPlugin.java b/analyzer/license-feeder/src/main/java/eu/fasten/analyzer/licensefeeder/LicenseFeederPlugin.java index f97848524..6a5e343dc 100644 --- a/analyzer/license-feeder/src/main/java/eu/fasten/analyzer/licensefeeder/LicenseFeederPlugin.java +++ b/analyzer/license-feeder/src/main/java/eu/fasten/analyzer/licensefeeder/LicenseFeederPlugin.java @@ -3,7 +3,6 @@ import eu.fasten.core.data.Constants; import eu.fasten.core.data.metadatadb.MetadataDao; import eu.fasten.core.maven.data.Revision; -import eu.fasten.core.plugins.AbstractKafkaPlugin; import eu.fasten.core.plugins.DBConnector; import eu.fasten.core.plugins.KafkaPlugin; import org.jooq.DSLContext; @@ -17,7 +16,11 @@ import org.slf4j.LoggerFactory; import java.sql.Timestamp; -import java.util.*; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; public class LicenseFeederPlugin extends Plugin { @@ -52,30 +55,23 @@ public void setTopics(List consumeTopics) { @Override public void consume(String record) { - try { // Fasten error-handling guidelines + this.pluginError = null; - this.pluginError = null; + logger.info("License feeder started."); - logger.info("License feeder started."); + // Retrieving coordinates of the input record + Revision coordinates = extractMavenCoordinates(record); + logger.info("Input coordinates: " + coordinates + "."); - // Retrieving coordinates of the input record - Revision coordinates = extractMavenCoordinates(record); - logger.info("Input coordinates: " + coordinates + "."); - - // Inserting detected outbound into the database - var metadataDao = new MetadataDao(dslContext); - dslContext.transaction(transaction -> { - metadataDao.setContext(DSL.using(transaction)); - insertOutboundLicenses(coordinates, record, metadataDao); - insertFileLicenses(coordinates, record, metadataDao); - }); - - // TODO Inserting licenses in files + // Inserting detected outbound into the database + var metadataDao = new MetadataDao(dslContext); + dslContext.transaction(transaction -> { + metadataDao.setContext(DSL.using(transaction)); + insertOutboundLicenses(coordinates, record, metadataDao); + insertFileLicenses(coordinates, record, metadataDao); + }); - } catch (Exception e) { // Fasten error-handling guidelines - logger.error(e.getMessage(), e.getCause()); - setPluginError(e); - } + // TODO Inserting licenses in files } /** diff --git a/analyzer/metadata-plugin/src/main/java/eu/fasten/analyzer/metadataplugin/MetadataDBExtension.java b/analyzer/metadata-plugin/src/main/java/eu/fasten/analyzer/metadataplugin/MetadataDBExtension.java index d6e9b3a1e..659811a57 100644 --- a/analyzer/metadata-plugin/src/main/java/eu/fasten/analyzer/metadataplugin/MetadataDBExtension.java +++ b/analyzer/metadata-plugin/src/main/java/eu/fasten/analyzer/metadataplugin/MetadataDBExtension.java @@ -18,14 +18,18 @@ package eu.fasten.analyzer.metadataplugin; -import eu.fasten.core.data.*; +import eu.fasten.core.data.Constants; +import eu.fasten.core.data.PartialCCallGraph; +import eu.fasten.core.data.PartialCallGraph; +import eu.fasten.core.data.PartialJavaCallGraph; +import eu.fasten.core.data.PartialPythonCallGraph; import eu.fasten.core.data.callableindex.ExtendedGidGraph; -import eu.fasten.core.data.callableindex.GidGraph; import eu.fasten.core.data.metadatadb.MetadataDao; import eu.fasten.core.data.metadatadb.codegen.enums.Access; import eu.fasten.core.data.metadatadb.codegen.enums.CallableType; import eu.fasten.core.data.metadatadb.codegen.tables.records.CallSitesRecord; import eu.fasten.core.data.metadatadb.codegen.tables.records.CallablesRecord; +import eu.fasten.core.exceptions.UnrecoverableError; import eu.fasten.core.maven.utils.MavenUtilities; import eu.fasten.core.plugins.DBConnector; import eu.fasten.core.plugins.KafkaPlugin; @@ -49,7 +53,11 @@ import java.nio.file.Paths; import java.sql.BatchUpdateException; import java.sql.Timestamp; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; public abstract class MetadataDBExtension implements KafkaPlugin, DBConnector { @@ -165,9 +173,16 @@ public void consume(String record) { setOutputPath(callgraph); } }); + } catch (DataAccessException e) { + transactionRestartCount++; + if (transactionRestartCount >= Constants.transactionRestartLimit) { + throw new UnrecoverableError("Could not connect to or query the Postgres DB and the plug-in should be stopped and restarted.", + e.getCause()); + } } catch (Exception expected) { + transactionRestartCount++; } - transactionRestartCount++; + } while (restartTransaction && !processedRecord && transactionRestartCount < Constants.transactionRestartLimit); } diff --git a/analyzer/python-license-feeder/src/main/java/eu/fasten/analyzer/pythonlicensefeeder/PythonLicenseFeederPlugin.java b/analyzer/python-license-feeder/src/main/java/eu/fasten/analyzer/pythonlicensefeeder/PythonLicenseFeederPlugin.java index a79d47e97..acfaffe04 100644 --- a/analyzer/python-license-feeder/src/main/java/eu/fasten/analyzer/pythonlicensefeeder/PythonLicenseFeederPlugin.java +++ b/analyzer/python-license-feeder/src/main/java/eu/fasten/analyzer/pythonlicensefeeder/PythonLicenseFeederPlugin.java @@ -2,8 +2,6 @@ import eu.fasten.core.data.Constants; import eu.fasten.core.data.metadatadb.MetadataDao; -import eu.fasten.core.maven.data.Revision; -import eu.fasten.core.plugins.AbstractKafkaPlugin; import eu.fasten.core.plugins.DBConnector; import eu.fasten.core.plugins.KafkaPlugin; import org.jooq.DSLContext; @@ -16,8 +14,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.Timestamp; -import java.util.*; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; public class PythonLicenseFeederPlugin extends Plugin { @@ -53,32 +54,24 @@ public void setTopics(List consumeTopics) { @Override public void consume(String record) { - try { // Fasten error-handling guidelines + this.pluginError = null; - this.pluginError = null; + logger.info("License feeder started."); - logger.info("License feeder started."); + String packageName = extractPackageName(record); + String packageVersion = extractPackageVersion(record); + String sourcePath = extractSourcePath(record); - String packageName = extractPackageName(record); - String packageVersion = extractPackageVersion(record); - String sourcePath = extractSourcePath(record); + logger.info("Package name: " + packageName + "."); + logger.info("Package version: " + packageVersion + "."); - logger.info("Package name: " + packageName + "."); - logger.info("Package version: " + packageVersion + "."); - - // Inserting detected outbound into the database - var metadataDao = new MetadataDao(dslContext); - dslContext.transaction(transaction -> { - metadataDao.setContext(DSL.using(transaction)); - insertOutboundLicenses(packageName, packageVersion, record, metadataDao); - insertFileLicenses(packageName, packageVersion, sourcePath, record, metadataDao); - }); - - - } catch (Exception e) { // Fasten error-handling guidelines - logger.error(e.getMessage(), e.getCause()); - setPluginError(e); - } + // Inserting detected outbound into the database + var metadataDao = new MetadataDao(dslContext); + dslContext.transaction(transaction -> { + metadataDao.setContext(DSL.using(transaction)); + insertOutboundLicenses(packageName, packageVersion, record, metadataDao); + insertFileLicenses(packageName, packageVersion, sourcePath, record, metadataDao); + }); } protected String extractPackageName(String record) { diff --git a/analyzer/quality-analyzer/src/main/java/eu/fasten/analyzer/qualityanalyzer/QualityAnalyzerPlugin.java b/analyzer/quality-analyzer/src/main/java/eu/fasten/analyzer/qualityanalyzer/QualityAnalyzerPlugin.java index c8d076e7b..2d027fb16 100644 --- a/analyzer/quality-analyzer/src/main/java/eu/fasten/analyzer/qualityanalyzer/QualityAnalyzerPlugin.java +++ b/analyzer/quality-analyzer/src/main/java/eu/fasten/analyzer/qualityanalyzer/QualityAnalyzerPlugin.java @@ -20,6 +20,7 @@ import eu.fasten.analyzer.qualityanalyzer.data.QAConstants; import eu.fasten.core.data.Constants; +import eu.fasten.core.exceptions.UnrecoverableError; import eu.fasten.core.plugins.DBConnector; import eu.fasten.core.plugins.KafkaPlugin; import org.jooq.DSLContext; @@ -32,7 +33,11 @@ import org.slf4j.LoggerFactory; import java.sql.BatchUpdateException; -import java.util.*; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; public class QualityAnalyzerPlugin extends Plugin { @@ -96,21 +101,30 @@ public void consume(String kafkaMessage) { do { logger.info("Beginning of the transaction sequence"); setPluginError(null); + transactionRestartCount++; + try { recordId = utils.processJsonRecord(jsonRecord); - if(recordId == null) { + if (recordId == null) { throw new IllegalStateException("No callables matched"); } } - catch(DataAccessException e) { + // Database-related errors + catch (DataAccessException e) { logger.info("Data access exception"); - // Database connection error + + if (transactionRestartCount >= Constants.transactionRestartLimit) { + throw new UnrecoverableError("Could not connect to or query the Postgres DB and the plug-in should be stopped and restarted.", + e.getCause()); + } + if (e.getCause() instanceof BatchUpdateException) { var exception = ((BatchUpdateException) e.getCause()) .getNextException(); setPluginError(exception); } + setPluginError(e); logger.info("Restarting transaction for '" + recordId + "'"); // It could be a deadlock, so restart transaction restartTransaction = true; @@ -136,8 +150,6 @@ public void consume(String kafkaMessage) { + "with callable id = " + recordId); } - transactionRestartCount++; - } while( restartTransaction && !processedRecord && transactionRestartCount < Constants.transactionRestartLimit ); } diff --git a/analyzer/vulnerability-packages-listener/src/main/java/eu/fasten/analyzer/vulnerabilitypackageslistener/VulnerabilityPackagesListener.java b/analyzer/vulnerability-packages-listener/src/main/java/eu/fasten/analyzer/vulnerabilitypackageslistener/VulnerabilityPackagesListener.java index c8ee44b3c..da12303c1 100644 --- a/analyzer/vulnerability-packages-listener/src/main/java/eu/fasten/analyzer/vulnerabilitypackageslistener/VulnerabilityPackagesListener.java +++ b/analyzer/vulnerability-packages-listener/src/main/java/eu/fasten/analyzer/vulnerabilitypackageslistener/VulnerabilityPackagesListener.java @@ -109,27 +109,18 @@ public void setTopics(List consumeTopics) { @Override public void consume(String record) { setPluginError(null); - try { - statementsProcessor.setDBConnection(contexts); - statementsProcessor.setFastenApiClient(fastenApiClient); - var jsonRecord = new JSONObject(record); - var payload = findPayload(jsonRecord); - if(payload != null) { - var ecosystem = payload.getString("forge"); - var packageName = getPackageName(payload); - var version = payload.getString("version"); - logger.info("Processing package update for forge \"" + ecosystem + "\": " + packageName + ":" + version); - statementsProcessor.updateNewPackageVersion(ecosystem, packageName, version); - } - else { - logger.error("Could not parse payload in message: " + record); - } - } - catch (Exception e) { - var error = "Error processing package update: " + e; - logger.error(error); - setPluginError(e); - throw(e); + statementsProcessor.setDBConnection(contexts); + statementsProcessor.setFastenApiClient(fastenApiClient); + var jsonRecord = new JSONObject(record); + var payload = findPayload(jsonRecord); + if (payload != null) { + var ecosystem = payload.getString("forge"); + var packageName = getPackageName(payload); + var version = payload.getString("version"); + logger.info("Processing package update for forge \"" + ecosystem + "\": " + packageName + ":" + version); + statementsProcessor.updateNewPackageVersion(ecosystem, packageName, version); + } else { + logger.error("Could not parse payload in message: " + record); } } diff --git a/analyzer/vulnerability-statements-processor/src/main/java/eu/fasten/analyzer/vulnerabilitystatementsprocessor/VulnerabilityStatementsProcessor.java b/analyzer/vulnerability-statements-processor/src/main/java/eu/fasten/analyzer/vulnerabilitystatementsprocessor/VulnerabilityStatementsProcessor.java index ae831e757..44b73f48d 100644 --- a/analyzer/vulnerability-statements-processor/src/main/java/eu/fasten/analyzer/vulnerabilitystatementsprocessor/VulnerabilityStatementsProcessor.java +++ b/analyzer/vulnerability-statements-processor/src/main/java/eu/fasten/analyzer/vulnerabilitystatementsprocessor/VulnerabilityStatementsProcessor.java @@ -89,14 +89,9 @@ public void setTopics(List consumeTopics) { @Override public void consume(String record) { this.pluginError = null; - try { - var v = updateVulnerability(record); - outputPath = baseOutputPath + File.separator + v.getId() + ".json"; - lastProcessedVulnerability = v; - } catch (Exception e) { - logger.error("Error processing vulnerability statement: " + e); - setPluginError(e); - } + var v = updateVulnerability(record); + outputPath = baseOutputPath + File.separator + v.getId() + ".json"; + lastProcessedVulnerability = v; } @Override diff --git a/server/src/main/java/eu/fasten/server/plugins/kafka/FastenKafkaPlugin.java b/server/src/main/java/eu/fasten/server/plugins/kafka/FastenKafkaPlugin.java index fb082dcc6..1c39b39b7 100644 --- a/server/src/main/java/eu/fasten/server/plugins/kafka/FastenKafkaPlugin.java +++ b/server/src/main/java/eu/fasten/server/plugins/kafka/FastenKafkaPlugin.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; +import org.jooq.exception.DataAccessException; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -353,6 +354,11 @@ public void processRecord(ConsumerRecord record, ProcessingLane // therefore K8s will restart the plug-in. logger.error("Forced to stop the plug-in due to ", e); throw e; + + } catch (DataAccessException e) { + // Database-related errors + throw new UnrecoverableError("Could not connect to or query the Postgres DB and the plug-in should be stopped and restarted.", + e.getCause()); } catch (Exception e) { logger.error("An error occurred in " + plugin.getClass().getCanonicalName(), e); plugin.setPluginError(e);