Skip to content

Commit

Permalink
#89 - fixed query for getting one item from BQ
Browse files Browse the repository at this point in the history
  • Loading branch information
petmongrels committed Dec 5, 2023
1 parent 797c3ab commit 158aecd
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ public <T> Iterator<T> getResults(String query, String date, int limit, BigQuery
return new BigQueryResultsMapper<T>().map(tableResult, resultMapper);
}

public FlowResult getResult(String query, FlowResultMapper flowResultMapper, String paramName, String paramValue) {
public FlowResult getResult(String query, FlowResultMapper flowResultMapper, String paramName, long paramValue) {
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(query)
.addNamedParameter(paramName, QueryParameterValue.string(paramValue))
.addNamedParameter(paramName, QueryParameterValue.int64(paramValue))
.build();
TableResult tableResult = run(queryConfig);
FieldValueList fieldValues = StreamSupport.stream(tableResult.getValues().spliterator(), false).findAny().orElse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ private IntegrationSystem getIntegrationSystem() {
return RepositoryProvider.getIntegrationSystemRepository().findById(IntegrationContext.getIntegrationSystemId()).get();
}

default Stream<ErrorRecord> getProcessableErrorRecords() {
default List<ErrorRecord> getProcessableErrorRecords() {
return findAllByProcessingDisabledFalseAndIntegrationSystem(getIntegrationSystem());
}
Stream<ErrorRecord> findAllByProcessingDisabledFalseAndIntegrationSystem(IntegrationSystem integrationSystem);
List<ErrorRecord> findAllByProcessingDisabledFalseAndIntegrationSystem(IntegrationSystem integrationSystem);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
package org.avni_integration_service.lahi.config;

import org.avni_integration_service.common.PlatformException;
import org.avni_integration_service.common.UnknownException;

public enum LahiErrorType {
CommonError,
PlatformError
PlatformError;

public static LahiErrorType getErrorType(Exception e) {
if (e instanceof PlatformException) return PlatformError;
if (e instanceof UnknownException) return CommonError;
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,12 @@ public class LahiStudentService {
""";

public static final String STUDENT_FETCH_QUERY = """
select c.phone, fr.results, s.inserted_at
from `glific-lms-lahi.918956411022.contacts` c, UNNEST(c.fields) AS s
join `glific-lms-lahi.918956411022.flow_results` fr
on fr.contact_phone = c.phone
select fr.contact_phone, fr.results, fr.id as flowresult_id, fr.inserted_at, fr.updated_at
from `glific-lms-lahi.918956411022.flow_results` fr
WHERE
(s.label, s.value) = ('avni_reg_complete', 'Yes')
AND
fr.name = 'Avni Students Registrations Flow'
AND
fr.id = @flowResultId
order by s.inserted_at desc
limit 1
""";

Expand All @@ -55,12 +50,13 @@ public LahiStudentService(BigQueryClient bigQueryClient,

public Students getStudents() {
String fetchTime = integratingEntityStatusRepository.find(STUDENT_ENTITY_TYPE).getReadUptoDateTime().toString();
logger.info(String.format("Getting students since %s", fetchTime));
Iterator<FlowResult> results = bigQueryClient.getResults(BULK_FETCH_QUERY, fetchTime, LIMIT, new FlowResultMapper());
return new Students(results);
}

public Student getStudent(String id) {
FlowResult flowResult = bigQueryClient.getResult(STUDENT_FETCH_QUERY, new FlowResultMapper(), "flowResultId", id);
FlowResult flowResult = bigQueryClient.getResult(STUDENT_FETCH_QUERY, new FlowResultMapper(), "flowResultId", Long.parseLong(id));
return new Student(flowResult);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,20 @@ public StudentErrorService(ErrorRecordRepository errorRecordRepository, Integrat
this.errorTypeRepository = errorTypeRepository;
}

public void platformError(Student lahiStudent, Throwable throwable) {
saveStudentError(lahiStudent, throwable, LahiErrorType.PlatformError);
}

public void studentProcessingError(Student lahiStudent, Throwable throwable) {
saveStudentError(lahiStudent, throwable, LahiErrorType.CommonError);
public void saveStudentError(Student lahiStudent, Exception exception) {
ErrorRecord errorRecord = getErrorRecord(lahiStudent);
saveStudentError(lahiStudent, exception, errorRecord);
}

private ErrorRecord saveStudentError(Student lahiStudent, Throwable throwable, LahiErrorType lahiErrorType) {
ErrorRecord errorRecord = getErrorRecord(lahiStudent);
ErrorType errorType = getErrorType(lahiErrorType);
String errorMsg = throwable.getMessage();
public void saveStudentError(Student lahiStudent, Exception exception, ErrorRecord errorRecord) {
ErrorType errorType = getErrorType(LahiErrorType.getErrorType(exception));
String errorMsg = exception.getMessage();
String lahiEntityType = LahiEntityType.Student.name();
String flowResultId = lahiStudent.getFlowResultId();
if (errorRecord != null) {
logger.info(String.format("Same error as the last processing for entity flowResultId %s, and type %s", flowResultId, lahiEntityType));
if (!errorRecord.hasThisAsLastErrorTypeAndErrorMessage(errorType, errorMsg))
if (errorRecord.hasThisAsLastErrorTypeAndErrorMessage(errorType, errorMsg))
logger.info(String.format("Same error as the last processing for entity flowResultId %s, and type %s", flowResultId, lahiEntityType));
else
errorRecord.addErrorLog(errorType, errorMsg);
} else {
errorRecord = new ErrorRecord();
Expand All @@ -51,7 +48,7 @@ private ErrorRecord saveStudentError(Student lahiStudent, Throwable throwable, L
errorRecord.addErrorLog(errorType, errorMsg);
errorRecord.setProcessingDisabled(false);
}
return errorRecordRepository.save(errorRecord);
errorRecordRepository.save(errorRecord);
}

private ErrorRecord getErrorRecord(Student lahiStudent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ public void processStudents() {
lahiIntegrationDataService.updateSyncStatus(student);
} catch (PlatformException e) {
logger.error("Platform level issue. Adding to error record.", e);
studentErrorService.platformError(student, e);
studentErrorService.saveStudentError(student, e);
lahiIntegrationDataService.updateSyncStatus(student);
} catch (UnknownException e) {
logger.error("Unknown error. Adding to error record.", e);
studentErrorService.studentProcessingError(student, e);
studentErrorService.saveStudentError(student, e);
lahiIntegrationDataService.updateSyncStatus(student);
} catch (MessageUnprocessableException e) {
logger.warn(String.format("Problem with message. Continue processing. %s", e.getMessage()));
Expand All @@ -69,6 +69,7 @@ public void processErrors() {
studentErrorService.processed(errorRecord, false);
} catch (PlatformException | UnknownException e) {
logger.error("Platform level issue again.", e);
studentErrorService.saveStudentError(student, e, errorRecord);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.avni_integration_service.lahi.config;

import org.avni_integration_service.common.PlatformException;
import org.avni_integration_service.common.UnknownException;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

class LahiErrorTypeTest {
@Test
void getErrorType() {
assertEquals(LahiErrorType.PlatformError, LahiErrorType.getErrorType(new PlatformException("")));
assertEquals(LahiErrorType.CommonError, LahiErrorType.getErrorType(new UnknownException(new NullPointerException())));
}
}

0 comments on commit 158aecd

Please sign in to comment.