Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle PSQLException in the FASTEN server and MetadataDBExtension #466

Merged
merged 7 commits into from
Jun 8, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

import eu.fasten.core.data.Constants;
import eu.fasten.core.data.metadatadb.MetadataDao;
//This import should probably be different
import eu.fasten.core.maven.data.Revision;
import eu.fasten.core.plugins.AbstractKafkaPlugin;
import eu.fasten.core.plugins.DBConnector;
import eu.fasten.core.plugins.KafkaPlugin;
import org.jooq.DSLContext;
Expand All @@ -17,8 +14,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Timestamp;
import java.util.*;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

//This import should probably be different

public class DebianLicenseFeederPlugin extends Plugin {

Expand Down Expand Up @@ -82,7 +84,7 @@ public void consume(String record) {

} catch (Exception e) { // Fasten error-handling guidelines
logger.error(e.getMessage(), e.getCause());
setPluginError(e);
throw e;
mir-am marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import eu.fasten.core.data.Constants;
import eu.fasten.core.data.metadatadb.MetadataDao;
import eu.fasten.core.maven.data.Revision;
import eu.fasten.core.plugins.AbstractKafkaPlugin;
import eu.fasten.core.plugins.DBConnector;
import eu.fasten.core.plugins.KafkaPlugin;
import org.jooq.DSLContext;
Expand All @@ -17,7 +16,11 @@
import org.slf4j.LoggerFactory;

import java.sql.Timestamp;
import java.util.*;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class LicenseFeederPlugin extends Plugin {

Expand Down Expand Up @@ -52,30 +55,23 @@ public void setTopics(List<String> consumeTopics) {

@Override
public void consume(String record) {
try { // Fasten error-handling guidelines
this.pluginError = null;

this.pluginError = null;
logger.info("License feeder started.");

logger.info("License feeder started.");
// Retrieving coordinates of the input record
Revision coordinates = extractMavenCoordinates(record);
logger.info("Input coordinates: " + coordinates + ".");

// Retrieving coordinates of the input record
Revision coordinates = extractMavenCoordinates(record);
logger.info("Input coordinates: " + coordinates + ".");

// Inserting detected outbound into the database
var metadataDao = new MetadataDao(dslContext);
dslContext.transaction(transaction -> {
metadataDao.setContext(DSL.using(transaction));
insertOutboundLicenses(coordinates, record, metadataDao);
insertFileLicenses(coordinates, record, metadataDao);
});

// TODO Inserting licenses in files
// Inserting detected outbound into the database
var metadataDao = new MetadataDao(dslContext);
dslContext.transaction(transaction -> {
metadataDao.setContext(DSL.using(transaction));
insertOutboundLicenses(coordinates, record, metadataDao);
insertFileLicenses(coordinates, record, metadataDao);
});

} catch (Exception e) { // Fasten error-handling guidelines
logger.error(e.getMessage(), e.getCause());
setPluginError(e);
}
// TODO Inserting licenses in files
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@

package eu.fasten.analyzer.metadataplugin;

import eu.fasten.core.data.*;
import eu.fasten.core.data.Constants;
import eu.fasten.core.data.PartialCCallGraph;
import eu.fasten.core.data.PartialCallGraph;
import eu.fasten.core.data.PartialJavaCallGraph;
import eu.fasten.core.data.PartialPythonCallGraph;
import eu.fasten.core.data.callableindex.ExtendedGidGraph;
import eu.fasten.core.data.callableindex.GidGraph;
import eu.fasten.core.data.metadatadb.MetadataDao;
import eu.fasten.core.data.metadatadb.codegen.enums.Access;
import eu.fasten.core.data.metadatadb.codegen.enums.CallableType;
import eu.fasten.core.data.metadatadb.codegen.tables.records.CallSitesRecord;
import eu.fasten.core.data.metadatadb.codegen.tables.records.CallablesRecord;
import eu.fasten.core.exceptions.UnrecoverableError;
import eu.fasten.core.maven.utils.MavenUtilities;
import eu.fasten.core.plugins.DBConnector;
import eu.fasten.core.plugins.KafkaPlugin;
Expand All @@ -49,7 +53,11 @@
import java.nio.file.Paths;
import java.sql.BatchUpdateException;
import java.sql.Timestamp;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public abstract class MetadataDBExtension implements KafkaPlugin, DBConnector {

Expand Down Expand Up @@ -165,9 +173,16 @@ public void consume(String record) {
setOutputPath(callgraph);
}
});
} catch (DataAccessException e) {
transactionRestartCount++;
if (transactionRestartCount >= Constants.transactionRestartLimit) {
throw new UnrecoverableError("Could not connect to or query the Postgres DB and the plug-in should be stopped and restarted.",
e.getCause());
}
} catch (Exception expected) {
transactionRestartCount++;
}
transactionRestartCount++;

} while (restartTransaction && !processedRecord
&& transactionRestartCount < Constants.transactionRestartLimit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import eu.fasten.core.data.Constants;
import eu.fasten.core.data.metadatadb.MetadataDao;
import eu.fasten.core.maven.data.Revision;
import eu.fasten.core.plugins.AbstractKafkaPlugin;
import eu.fasten.core.plugins.DBConnector;
import eu.fasten.core.plugins.KafkaPlugin;
import org.jooq.DSLContext;
Expand All @@ -16,8 +14,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Timestamp;
import java.util.*;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class PythonLicenseFeederPlugin extends Plugin {

Expand Down Expand Up @@ -53,32 +54,24 @@ public void setTopics(List<String> consumeTopics) {

@Override
public void consume(String record) {
try { // Fasten error-handling guidelines
this.pluginError = null;

this.pluginError = null;
logger.info("License feeder started.");

logger.info("License feeder started.");
String packageName = extractPackageName(record);
String packageVersion = extractPackageVersion(record);
String sourcePath = extractSourcePath(record);

String packageName = extractPackageName(record);
String packageVersion = extractPackageVersion(record);
String sourcePath = extractSourcePath(record);
logger.info("Package name: " + packageName + ".");
logger.info("Package version: " + packageVersion + ".");

logger.info("Package name: " + packageName + ".");
logger.info("Package version: " + packageVersion + ".");

// Inserting detected outbound into the database
var metadataDao = new MetadataDao(dslContext);
dslContext.transaction(transaction -> {
metadataDao.setContext(DSL.using(transaction));
insertOutboundLicenses(packageName, packageVersion, record, metadataDao);
insertFileLicenses(packageName, packageVersion, sourcePath, record, metadataDao);
});


} catch (Exception e) { // Fasten error-handling guidelines
logger.error(e.getMessage(), e.getCause());
setPluginError(e);
}
// Inserting detected outbound into the database
var metadataDao = new MetadataDao(dslContext);
dslContext.transaction(transaction -> {
metadataDao.setContext(DSL.using(transaction));
insertOutboundLicenses(packageName, packageVersion, record, metadataDao);
insertFileLicenses(packageName, packageVersion, sourcePath, record, metadataDao);
});
}

protected String extractPackageName(String record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import eu.fasten.analyzer.qualityanalyzer.data.QAConstants;
import eu.fasten.core.data.Constants;
import eu.fasten.core.exceptions.UnrecoverableError;
import eu.fasten.core.plugins.DBConnector;
import eu.fasten.core.plugins.KafkaPlugin;
import org.jooq.DSLContext;
Expand All @@ -32,7 +33,11 @@
import org.slf4j.LoggerFactory;

import java.sql.BatchUpdateException;
import java.util.*;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;


public class QualityAnalyzerPlugin extends Plugin {
Expand Down Expand Up @@ -96,21 +101,30 @@ public void consume(String kafkaMessage) {
do {
logger.info("Beginning of the transaction sequence");
setPluginError(null);
transactionRestartCount++;

try {
recordId = utils.processJsonRecord(jsonRecord);
if(recordId == null) {
if (recordId == null) {
throw new IllegalStateException("No callables matched");
}
}
catch(DataAccessException e) {
// Database-related errors
catch (DataAccessException e) {
logger.info("Data access exception");
// Database connection error

if (transactionRestartCount >= Constants.transactionRestartLimit) {
throw new UnrecoverableError("Could not connect to or query the Postgres DB and the plug-in should be stopped and restarted.",
e.getCause());
}

if (e.getCause() instanceof BatchUpdateException) {
var exception = ((BatchUpdateException) e.getCause())
.getNextException();
setPluginError(exception);
}

setPluginError(e);
logger.info("Restarting transaction for '" + recordId + "'");
// It could be a deadlock, so restart transaction
restartTransaction = true;
Expand All @@ -136,8 +150,6 @@ public void consume(String kafkaMessage) {
+ "with callable id = " + recordId);
}

transactionRestartCount++;

} while( restartTransaction && !processedRecord && transactionRestartCount < Constants.transactionRestartLimit );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,27 +109,18 @@ public void setTopics(List<String> consumeTopics) {
@Override
public void consume(String record) {
setPluginError(null);
try {
statementsProcessor.setDBConnection(contexts);
statementsProcessor.setFastenApiClient(fastenApiClient);
var jsonRecord = new JSONObject(record);
var payload = findPayload(jsonRecord);
if(payload != null) {
var ecosystem = payload.getString("forge");
var packageName = getPackageName(payload);
var version = payload.getString("version");
logger.info("Processing package update for forge \"" + ecosystem + "\": " + packageName + ":" + version);
statementsProcessor.updateNewPackageVersion(ecosystem, packageName, version);
}
else {
logger.error("Could not parse payload in message: " + record);
}
}
catch (Exception e) {
var error = "Error processing package update: " + e;
logger.error(error);
setPluginError(e);
throw(e);
statementsProcessor.setDBConnection(contexts);
statementsProcessor.setFastenApiClient(fastenApiClient);
var jsonRecord = new JSONObject(record);
var payload = findPayload(jsonRecord);
if (payload != null) {
var ecosystem = payload.getString("forge");
var packageName = getPackageName(payload);
var version = payload.getString("version");
logger.info("Processing package update for forge \"" + ecosystem + "\": " + packageName + ":" + version);
statementsProcessor.updateNewPackageVersion(ecosystem, packageName, version);
} else {
logger.error("Could not parse payload in message: " + record);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,9 @@ public void setTopics(List<String> consumeTopics) {
@Override
public void consume(String record) {
this.pluginError = null;
try {
var v = updateVulnerability(record);
outputPath = baseOutputPath + File.separator + v.getId() + ".json";
lastProcessedVulnerability = v;
} catch (Exception e) {
logger.error("Error processing vulnerability statement: " + e);
setPluginError(e);
}
var v = updateVulnerability(record);
outputPath = baseOutputPath + File.separator + v.getId() + ".json";
lastProcessedVulnerability = v;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.jooq.exception.DataAccessException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -353,6 +354,11 @@ public void processRecord(ConsumerRecord<String, String> record, ProcessingLane
// therefore K8s will restart the plug-in.
logger.error("Forced to stop the plug-in due to ", e);
throw e;

} catch (DataAccessException e) {
// Database-related errors
throw new UnrecoverableError("Could not connect to or query the Postgres DB and the plug-in should be stopped and restarted.",
e.getCause());
} catch (Exception e) {
logger.error("An error occurred in " + plugin.getClass().getCanonicalName(), e);
plugin.setPluginError(e);
Expand Down