diff --git a/glific/src/main/java/org/avni_integration_service/glific/bigQuery/BigQueryClient.java b/glific/src/main/java/org/avni_integration_service/glific/bigQuery/BigQueryClient.java index 3b875b8d..fbc6f1ab 100644 --- a/glific/src/main/java/org/avni_integration_service/glific/bigQuery/BigQueryClient.java +++ b/glific/src/main/java/org/avni_integration_service/glific/bigQuery/BigQueryClient.java @@ -32,10 +32,10 @@ public Iterator getResults(String query, String date, int limit, BigQuery return new BigQueryResultsMapper().map(tableResult, resultMapper); } - public FlowResult getResult(String query, FlowResultMapper flowResultMapper, String paramName, String paramValue) { + public FlowResult getResult(String query, FlowResultMapper flowResultMapper, String paramName, long paramValue) { QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query) - .addNamedParameter(paramName, QueryParameterValue.string(paramValue)) + .addNamedParameter(paramName, QueryParameterValue.int64(paramValue)) .build(); TableResult tableResult = run(queryConfig); FieldValueList fieldValues = StreamSupport.stream(tableResult.getValues().spliterator(), false).findAny().orElse(null); diff --git a/integration-data/src/main/java/org/avni_integration_service/integration_data/repository/ErrorRecordRepository.java b/integration-data/src/main/java/org/avni_integration_service/integration_data/repository/ErrorRecordRepository.java index 886a9d2b..7de701ec 100644 --- a/integration-data/src/main/java/org/avni_integration_service/integration_data/repository/ErrorRecordRepository.java +++ b/integration-data/src/main/java/org/avni_integration_service/integration_data/repository/ErrorRecordRepository.java @@ -38,8 +38,8 @@ private IntegrationSystem getIntegrationSystem() { return RepositoryProvider.getIntegrationSystemRepository().findById(IntegrationContext.getIntegrationSystemId()).get(); } - default Stream getProcessableErrorRecords() { + default List getProcessableErrorRecords() { return findAllByProcessingDisabledFalseAndIntegrationSystem(getIntegrationSystem()); } - Stream findAllByProcessingDisabledFalseAndIntegrationSystem(IntegrationSystem integrationSystem); + List findAllByProcessingDisabledFalseAndIntegrationSystem(IntegrationSystem integrationSystem); } diff --git a/lahi/src/main/java/org/avni_integration_service/lahi/config/LahiErrorType.java b/lahi/src/main/java/org/avni_integration_service/lahi/config/LahiErrorType.java index c6733b42..0f615ab6 100644 --- a/lahi/src/main/java/org/avni_integration_service/lahi/config/LahiErrorType.java +++ b/lahi/src/main/java/org/avni_integration_service/lahi/config/LahiErrorType.java @@ -1,6 +1,15 @@ package org.avni_integration_service.lahi.config; +import org.avni_integration_service.common.PlatformException; +import org.avni_integration_service.common.UnknownException; + public enum LahiErrorType { CommonError, - PlatformError + PlatformError; + + public static LahiErrorType getErrorType(Exception e) { + if (e instanceof PlatformException) return PlatformError; + if (e instanceof UnknownException) return CommonError; + return null; + } } diff --git a/lahi/src/main/java/org/avni_integration_service/lahi/service/LahiStudentService.java b/lahi/src/main/java/org/avni_integration_service/lahi/service/LahiStudentService.java index 59a6eeec..501d576f 100644 --- a/lahi/src/main/java/org/avni_integration_service/lahi/service/LahiStudentService.java +++ b/lahi/src/main/java/org/avni_integration_service/lahi/service/LahiStudentService.java @@ -30,17 +30,12 @@ public class LahiStudentService { """; public static final String STUDENT_FETCH_QUERY = """ - select c.phone, fr.results, s.inserted_at - from `glific-lms-lahi.918956411022.contacts` c, UNNEST(c.fields) AS s - join `glific-lms-lahi.918956411022.flow_results` fr - on fr.contact_phone = c.phone + select fr.contact_phone, fr.results, fr.id as flowresult_id, fr.inserted_at, fr.updated_at + from `glific-lms-lahi.918956411022.flow_results` fr WHERE - (s.label, s.value) = ('avni_reg_complete', 'Yes') - AND fr.name = 'Avni Students Registrations Flow' AND fr.id = @flowResultId - order by s.inserted_at desc limit 1 """; @@ -55,12 +50,13 @@ public LahiStudentService(BigQueryClient bigQueryClient, public Students getStudents() { String fetchTime = integratingEntityStatusRepository.find(STUDENT_ENTITY_TYPE).getReadUptoDateTime().toString(); + logger.info(String.format("Getting students since %s", fetchTime)); Iterator results = bigQueryClient.getResults(BULK_FETCH_QUERY, fetchTime, LIMIT, new FlowResultMapper()); return new Students(results); } public Student getStudent(String id) { - FlowResult flowResult = bigQueryClient.getResult(STUDENT_FETCH_QUERY, new FlowResultMapper(), "flowResultId", id); + FlowResult flowResult = bigQueryClient.getResult(STUDENT_FETCH_QUERY, new FlowResultMapper(), "flowResultId", Long.parseLong(id)); return new Student(flowResult); } } diff --git a/lahi/src/main/java/org/avni_integration_service/lahi/service/StudentErrorService.java b/lahi/src/main/java/org/avni_integration_service/lahi/service/StudentErrorService.java index ab7461f9..90a9c77a 100644 --- a/lahi/src/main/java/org/avni_integration_service/lahi/service/StudentErrorService.java +++ b/lahi/src/main/java/org/avni_integration_service/lahi/service/StudentErrorService.java @@ -25,23 +25,20 @@ public StudentErrorService(ErrorRecordRepository errorRecordRepository, Integrat this.errorTypeRepository = errorTypeRepository; } - public void platformError(Student lahiStudent, Throwable throwable) { - saveStudentError(lahiStudent, throwable, LahiErrorType.PlatformError); - } - - public void studentProcessingError(Student lahiStudent, Throwable throwable) { - saveStudentError(lahiStudent, throwable, LahiErrorType.CommonError); + public void saveStudentError(Student lahiStudent, Exception exception) { + ErrorRecord errorRecord = getErrorRecord(lahiStudent); + saveStudentError(lahiStudent, exception, errorRecord); } - private ErrorRecord saveStudentError(Student lahiStudent, Throwable throwable, LahiErrorType lahiErrorType) { - ErrorRecord errorRecord = getErrorRecord(lahiStudent); - ErrorType errorType = getErrorType(lahiErrorType); - String errorMsg = throwable.getMessage(); + public void saveStudentError(Student lahiStudent, Exception exception, ErrorRecord errorRecord) { + ErrorType errorType = getErrorType(LahiErrorType.getErrorType(exception)); + String errorMsg = exception.getMessage(); String lahiEntityType = LahiEntityType.Student.name(); String flowResultId = lahiStudent.getFlowResultId(); if (errorRecord != null) { - logger.info(String.format("Same error as the last processing for entity flowResultId %s, and type %s", flowResultId, lahiEntityType)); - if (!errorRecord.hasThisAsLastErrorTypeAndErrorMessage(errorType, errorMsg)) + if (errorRecord.hasThisAsLastErrorTypeAndErrorMessage(errorType, errorMsg)) + logger.info(String.format("Same error as the last processing for entity flowResultId %s, and type %s", flowResultId, lahiEntityType)); + else errorRecord.addErrorLog(errorType, errorMsg); } else { errorRecord = new ErrorRecord(); @@ -51,7 +48,7 @@ private ErrorRecord saveStudentError(Student lahiStudent, Throwable throwable, L errorRecord.addErrorLog(errorType, errorMsg); errorRecord.setProcessingDisabled(false); } - return errorRecordRepository.save(errorRecord); + errorRecordRepository.save(errorRecord); } private ErrorRecord getErrorRecord(Student lahiStudent) { diff --git a/lahi/src/main/java/org/avni_integration_service/lahi/worker/StudentWorker.java b/lahi/src/main/java/org/avni_integration_service/lahi/worker/StudentWorker.java index bf6d3a9d..78191386 100644 --- a/lahi/src/main/java/org/avni_integration_service/lahi/worker/StudentWorker.java +++ b/lahi/src/main/java/org/avni_integration_service/lahi/worker/StudentWorker.java @@ -39,11 +39,11 @@ public void processStudents() { lahiIntegrationDataService.updateSyncStatus(student); } catch (PlatformException e) { logger.error("Platform level issue. Adding to error record.", e); - studentErrorService.platformError(student, e); + studentErrorService.saveStudentError(student, e); lahiIntegrationDataService.updateSyncStatus(student); } catch (UnknownException e) { logger.error("Unknown error. Adding to error record.", e); - studentErrorService.studentProcessingError(student, e); + studentErrorService.saveStudentError(student, e); lahiIntegrationDataService.updateSyncStatus(student); } catch (MessageUnprocessableException e) { logger.warn(String.format("Problem with message. Continue processing. %s", e.getMessage())); @@ -69,6 +69,7 @@ public void processErrors() { studentErrorService.processed(errorRecord, false); } catch (PlatformException | UnknownException e) { logger.error("Platform level issue again.", e); + studentErrorService.saveStudentError(student, e, errorRecord); } }); } diff --git a/lahi/src/test/java/org/avni_integration_service/lahi/config/LahiErrorTypeTest.java b/lahi/src/test/java/org/avni_integration_service/lahi/config/LahiErrorTypeTest.java new file mode 100644 index 00000000..9f7c568f --- /dev/null +++ b/lahi/src/test/java/org/avni_integration_service/lahi/config/LahiErrorTypeTest.java @@ -0,0 +1,15 @@ +package org.avni_integration_service.lahi.config; + +import org.avni_integration_service.common.PlatformException; +import org.avni_integration_service.common.UnknownException; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class LahiErrorTypeTest { + @Test + void getErrorType() { + assertEquals(LahiErrorType.PlatformError, LahiErrorType.getErrorType(new PlatformException(""))); + assertEquals(LahiErrorType.CommonError, LahiErrorType.getErrorType(new UnknownException(new NullPointerException()))); + } +}