From 1304630907751dec10e167c2e3442345253b6893 Mon Sep 17 00:00:00 2001 From: Christian Kurz Date: Wed, 25 Sep 2024 05:03:24 +0200 Subject: [PATCH 1/2] general sync high load improvements --- .../gropius/sync/github/GithubDataService.kt | 50 ++++++++++++++++++- .../kotlin/gropius/sync/github/GithubSync.kt | 8 ++- .../main/kotlin/gropius/sync/AbstractSync.kt | 19 +++---- .../gropius/sync/CursorResourceWalker.kt | 5 +- .../gropius/sync/LoadBalancedDataFetcher.kt | 8 ++- .../main/kotlin/gropius/sync/TokenManager.kt | 7 ++- 6 files changed, 77 insertions(+), 20 deletions(-) diff --git a/sync-github/src/main/kotlin/gropius/sync/github/GithubDataService.kt b/sync-github/src/main/kotlin/gropius/sync/github/GithubDataService.kt index 9fcc4153..c8d104a5 100644 --- a/sync-github/src/main/kotlin/gropius/sync/github/GithubDataService.kt +++ b/sync-github/src/main/kotlin/gropius/sync/github/GithubDataService.kt @@ -4,6 +4,7 @@ import com.apollographql.apollo3.ApolloClient import com.apollographql.apollo3.api.ApolloResponse import com.apollographql.apollo3.api.Mutation import com.apollographql.apollo3.api.Query +import com.apollographql.apollo3.network.http.HttpInfo import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.ObjectMapper import gropius.model.architecture.IMSProject @@ -36,6 +37,9 @@ import org.springframework.data.neo4j.core.findById import org.springframework.stereotype.Component import java.time.OffsetDateTime import java.util.* +import java.util.concurrent.ConcurrentHashMap + +class GitHubResponseException(val errors: List) : Exception(errors.toString()) /** * Service to handle data from GitHub @@ -63,6 +67,7 @@ class GithubDataService( companion object { const val FALLBACK_USER_NAME = "github" + val TOKEN_WOUND_UP_IN_JAIL = ConcurrentHashMap() } /** @@ -206,6 +211,27 @@ class GithubDataService( return label } + /** + * Check windup timer for token + * @param token the token to check + * @return true if the token is in timeout + */ + suspend fun tokenInTimeout(token: String): Boolean { + if (TOKEN_WOUND_UP_IN_JAIL.contains(token)) { + return TOKEN_WOUND_UP_IN_JAIL[token]!! > OffsetDateTime.now() + } + return false + } + + /** + * Windup request blocking timer + * @param token the token to windup + * @param seconds the time to windup + */ + suspend fun windToken(token: String, seconds: Int) { + TOKEN_WOUND_UP_IN_JAIL[token] = OffsetDateTime.now().plusSeconds(seconds.toLong()) + } + /** * Send a mutation to the IMS * @@ -230,14 +256,24 @@ class GithubDataService( } logger.info("Requesting with users: $userList") return tokenManager.executeUntilWorking(imsProject, userList, owner) { token -> + if (tokenInTimeout(token.token!!)) { + return@executeUntilWorking Optional.empty() + } val apolloClient = ApolloClient.Builder().serverUrl(imsConfig.graphQLUrl.toString()) .addHttpHeader("Authorization", "Bearer ${token.token}").build() val res = apolloClient.mutation(body).execute() logger.info("Response Code for request with token $token is ${res.data} ${res.errors}") + val headers = res.executionContext[HttpInfo]?.headers + if ((headers?.firstOrNull { it.name == "x-ratelimit-remaining" }?.value?.toInt() ?: 0) < 100) { + windToken(token.token!!, 3600) + } if (res.errors?.isNotEmpty() != true) { Optional.of(res) - } else { + } else if (res.errors?.all { it.nonStandardFields?.get("type") == "RATE_LIMITED" } == true) { + windToken(token.token!!, 10800) Optional.empty() + } else { + throw GitHubResponseException(res.errors!!) } } } @@ -270,14 +306,24 @@ class GithubDataService( } logger.info("Requesting with users: $userList ") return tokenManager.executeUntilWorking(imsProject, userList, listOf()) { token -> + if (tokenInTimeout(token.token!!)) { + return@executeUntilWorking Optional.empty() + } val apolloClient = ApolloClient.Builder().serverUrl(imsConfig.graphQLUrl.toString()) .addHttpHeader("Authorization", "Bearer ${token.token}").build() val res = apolloClient.query(body).execute() logger.info("Response Code for request with token $token is ${res.data} ${res.errors}") + val headers = res.executionContext[HttpInfo]?.headers + if ((headers?.firstOrNull { it.name == "x-ratelimit-remaining" }?.value?.toInt() ?: 0) < 100) { + windToken(token.token!!, 3600) + } if (res.errors?.isNotEmpty() != true) { Optional.of(res) - } else { + } else if (res.errors?.all { it.nonStandardFields?.get("type") == "RATE_LIMITED" } == true) { + windToken(token.token!!, 10800) Optional.empty() + } else { + throw GitHubResponseException(res.errors!!) } } } diff --git a/sync-github/src/main/kotlin/gropius/sync/github/GithubSync.kt b/sync-github/src/main/kotlin/gropius/sync/github/GithubSync.kt index 1bcee93a..b719eb11 100644 --- a/sync-github/src/main/kotlin/gropius/sync/github/GithubSync.kt +++ b/sync-github/src/main/kotlin/gropius/sync/github/GithubSync.kt @@ -6,8 +6,6 @@ import gropius.model.issue.Label import gropius.model.issue.timeline.IssueComment import gropius.model.template.IMSTemplate import gropius.model.template.IssueState -import gropius.model.user.GropiusUser -import gropius.model.user.IMSUser import gropius.model.user.User import gropius.sync.* import gropius.sync.github.config.IMSConfigManager @@ -127,7 +125,7 @@ final class GithubSync( imsProject, it.id!!, GitHubResourceWalkerConfig( CursorResourceWalkerConfig( 1.0, - 0.1, + 0.001, GithubGithubResourceWalkerEstimatedBudgetUsageType(), GithubGithubResourceWalkerBudgetUsageType() ), imsProjectConfig.repo.owner, imsProjectConfig.repo.repo, 100 @@ -142,7 +140,7 @@ final class GithubSync( imsProject, dirtyIssue.id!!, comment.githubId, GitHubResourceWalkerConfig( CursorResourceWalkerConfig( 1.0, - 0.1, + 0.001, GithubGithubResourceWalkerEstimatedBudgetUsageType(), GithubGithubResourceWalkerBudgetUsageType() ), imsProjectConfig.repo.owner, imsProjectConfig.repo.repo, 100 @@ -156,7 +154,7 @@ final class GithubSync( override suspend fun findUnsyncedIssues(imsProject: IMSProject): List { return issuePileService.findByImsProjectAndHasUnsyncedData(imsProject.rawId!!, true) } - + override suspend fun syncComment( imsProject: IMSProject, issueId: String, issueComment: IssueComment, users: List ): TimelineItemConversionInformation? { diff --git a/sync/src/main/kotlin/gropius/sync/AbstractSync.kt b/sync/src/main/kotlin/gropius/sync/AbstractSync.kt index 69322bab..5f188eb5 100644 --- a/sync/src/main/kotlin/gropius/sync/AbstractSync.kt +++ b/sync/src/main/kotlin/gropius/sync/AbstractSync.kt @@ -228,7 +228,7 @@ abstract class AbstractSync( * Sync Incoming Part * @param imsProject IMS project to sync */ - suspend fun doIncoming(imsProject: IMSProject) { + private suspend fun doIncoming(imsProject: IMSProject) { val dereplicatorRequest = SimpleIssueDereplicatorRequest( collectedSyncInfo.neoOperations.findAll().filter { it.username == "gropius" }.firstOrNull() ?: collectedSyncInfo.neoOperations.save( @@ -239,14 +239,15 @@ abstract class AbstractSync( ) try { findUnsyncedIssues(imsProject).forEach { - syncIncomingIssue(imsProject, it, dereplicatorRequest) - //} catch (e: SyncNotificator.NotificatedError) { - // syncNotificator.sendNotification( - // imsIssue, SyncNotificator.NotificationDummy(e) - // ) - //} catch (e: Exception) { - // logger.warn("Error in issue sync", e) - //} + try { + syncIncomingIssue(imsProject, it, dereplicatorRequest)/* + } catch (e: SyncNotificator.NotificatedError) { + syncNotificator.sendNotification( + imsIssue, SyncNotificator.NotificationDummy(e) + )*/ + } catch (e: Exception) { + logger.warn("Exception in issue sync", e) + } } } catch (e: SyncNotificator.NotificatedError) { logger.warn("Error in IMSProject sync", e) diff --git a/sync/src/main/kotlin/gropius/sync/CursorResourceWalker.kt b/sync/src/main/kotlin/gropius/sync/CursorResourceWalker.kt index a8aaecfd..c4962c5c 100644 --- a/sync/src/main/kotlin/gropius/sync/CursorResourceWalker.kt +++ b/sync/src/main/kotlin/gropius/sync/CursorResourceWalker.kt @@ -58,7 +58,10 @@ abstract class CursorResourceWalker) +/** + * Exception thrown when no valid token is available + */ +class NoTokenValidException : Exception() + /** * Manager for token from login service * @param neoOperations Reference for the spring instance of ReactiveNeo4jOperations @@ -209,7 +214,7 @@ abstract class TokenManager( logger.trace("User $user does not allow sync from $owner") } } - TODO("Error Message for no working users") + throw NoTokenValidException() } /** From 0f8126b018cc44e55893da587169cc3fd7ed1b1c Mon Sep 17 00:00:00 2001 From: Christian Kurz Date: Wed, 25 Sep 2024 05:09:39 +0200 Subject: [PATCH 2/2] dokka for GitHubResponseException --- .../src/main/kotlin/gropius/sync/github/GithubDataService.kt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sync-github/src/main/kotlin/gropius/sync/github/GithubDataService.kt b/sync-github/src/main/kotlin/gropius/sync/github/GithubDataService.kt index c8d104a5..3d8d64b7 100644 --- a/sync-github/src/main/kotlin/gropius/sync/github/GithubDataService.kt +++ b/sync-github/src/main/kotlin/gropius/sync/github/GithubDataService.kt @@ -39,6 +39,10 @@ import java.time.OffsetDateTime import java.util.* import java.util.concurrent.ConcurrentHashMap +/** + * Exception for GitHub response errors + * @param errors the errors GitHub sent us + */ class GitHubResponseException(val errors: List) : Exception(errors.toString()) /**