Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

general sync high load improvements #135

Merged
merged 2 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.apollographql.apollo3.ApolloClient
import com.apollographql.apollo3.api.ApolloResponse
import com.apollographql.apollo3.api.Mutation
import com.apollographql.apollo3.api.Query
import com.apollographql.apollo3.network.http.HttpInfo
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import gropius.model.architecture.IMSProject
Expand Down Expand Up @@ -36,6 +37,13 @@ import org.springframework.data.neo4j.core.findById
import org.springframework.stereotype.Component
import java.time.OffsetDateTime
import java.util.*
import java.util.concurrent.ConcurrentHashMap

/**
* Exception for GitHub response errors
* @param errors the errors GitHub sent us
*/
class GitHubResponseException(val errors: List<com.apollographql.apollo3.api.Error>) : Exception(errors.toString())

/**
* Service to handle data from GitHub
Expand Down Expand Up @@ -63,6 +71,7 @@ class GithubDataService(

companion object {
const val FALLBACK_USER_NAME = "github"
val TOKEN_WOUND_UP_IN_JAIL = ConcurrentHashMap<String, OffsetDateTime>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

long term I still think this should be in the db

Copy link
Contributor Author

@chriku chriku Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i agree (partially)

}

/**
Expand Down Expand Up @@ -206,6 +215,27 @@ class GithubDataService(
return label
}

/**
* Check windup timer for token
* @param token the token to check
* @return true if the token is in timeout
*/
suspend fun tokenInTimeout(token: String): Boolean {
if (TOKEN_WOUND_UP_IN_JAIL.contains(token)) {
return TOKEN_WOUND_UP_IN_JAIL[token]!! > OffsetDateTime.now()
}
return false
}

/**
* Windup request blocking timer
* @param token the token to windup
* @param seconds the time to windup
*/
suspend fun windToken(token: String, seconds: Int) {
TOKEN_WOUND_UP_IN_JAIL[token] = OffsetDateTime.now().plusSeconds(seconds.toLong())
}

/**
* Send a mutation to the IMS
*
Expand All @@ -230,14 +260,24 @@ class GithubDataService(
}
logger.info("Requesting with users: $userList")
return tokenManager.executeUntilWorking(imsProject, userList, owner) { token ->
if (tokenInTimeout(token.token!!)) {
return@executeUntilWorking Optional.empty()
}
val apolloClient = ApolloClient.Builder().serverUrl(imsConfig.graphQLUrl.toString())
.addHttpHeader("Authorization", "Bearer ${token.token}").build()
val res = apolloClient.mutation(body).execute()
logger.info("Response Code for request with token $token is ${res.data} ${res.errors}")
val headers = res.executionContext[HttpInfo]?.headers
if ((headers?.firstOrNull { it.name == "x-ratelimit-remaining" }?.value?.toInt() ?: 0) < 100) {
windToken(token.token!!, 3600)
}
if (res.errors?.isNotEmpty() != true) {
Optional.of(res)
} else {
} else if (res.errors?.all { it.nonStandardFields?.get("type") == "RATE_LIMITED" } == true) {
windToken(token.token!!, 10800)
Optional.empty()
} else {
throw GitHubResponseException(res.errors!!)
}
}
}
Expand Down Expand Up @@ -270,14 +310,24 @@ class GithubDataService(
}
logger.info("Requesting with users: $userList ")
return tokenManager.executeUntilWorking(imsProject, userList, listOf()) { token ->
if (tokenInTimeout(token.token!!)) {
return@executeUntilWorking Optional.empty()
}
val apolloClient = ApolloClient.Builder().serverUrl(imsConfig.graphQLUrl.toString())
.addHttpHeader("Authorization", "Bearer ${token.token}").build()
val res = apolloClient.query(body).execute()
logger.info("Response Code for request with token $token is ${res.data} ${res.errors}")
val headers = res.executionContext[HttpInfo]?.headers
if ((headers?.firstOrNull { it.name == "x-ratelimit-remaining" }?.value?.toInt() ?: 0) < 100) {
windToken(token.token!!, 3600)
}
if (res.errors?.isNotEmpty() != true) {
Optional.of(res)
} else {
} else if (res.errors?.all { it.nonStandardFields?.get("type") == "RATE_LIMITED" } == true) {
windToken(token.token!!, 10800)
Optional.empty()
} else {
throw GitHubResponseException(res.errors!!)
}
}
}
Expand Down
8 changes: 3 additions & 5 deletions sync-github/src/main/kotlin/gropius/sync/github/GithubSync.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import gropius.model.issue.Label
import gropius.model.issue.timeline.IssueComment
import gropius.model.template.IMSTemplate
import gropius.model.template.IssueState
import gropius.model.user.GropiusUser
import gropius.model.user.IMSUser
import gropius.model.user.User
import gropius.sync.*
import gropius.sync.github.config.IMSConfigManager
Expand Down Expand Up @@ -127,7 +125,7 @@ final class GithubSync(
imsProject, it.id!!, GitHubResourceWalkerConfig(
CursorResourceWalkerConfig<GithubGithubResourceWalkerBudgetUsageType, GithubGithubResourceWalkerEstimatedBudgetUsageType>(
1.0,
0.1,
0.001,
GithubGithubResourceWalkerEstimatedBudgetUsageType(),
GithubGithubResourceWalkerBudgetUsageType()
), imsProjectConfig.repo.owner, imsProjectConfig.repo.repo, 100
Expand All @@ -142,7 +140,7 @@ final class GithubSync(
imsProject, dirtyIssue.id!!, comment.githubId, GitHubResourceWalkerConfig(
CursorResourceWalkerConfig<GithubGithubResourceWalkerBudgetUsageType, GithubGithubResourceWalkerEstimatedBudgetUsageType>(
1.0,
0.1,
0.001,
GithubGithubResourceWalkerEstimatedBudgetUsageType(),
GithubGithubResourceWalkerBudgetUsageType()
), imsProjectConfig.repo.owner, imsProjectConfig.repo.repo, 100
Expand All @@ -156,7 +154,7 @@ final class GithubSync(
override suspend fun findUnsyncedIssues(imsProject: IMSProject): List<IncomingIssue> {
return issuePileService.findByImsProjectAndHasUnsyncedData(imsProject.rawId!!, true)
}

override suspend fun syncComment(
imsProject: IMSProject, issueId: String, issueComment: IssueComment, users: List<User>
): TimelineItemConversionInformation? {
Expand Down
19 changes: 10 additions & 9 deletions sync/src/main/kotlin/gropius/sync/AbstractSync.kt
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ abstract class AbstractSync(
* Sync Incoming Part
* @param imsProject IMS project to sync
*/
suspend fun doIncoming(imsProject: IMSProject) {
private suspend fun doIncoming(imsProject: IMSProject) {
val dereplicatorRequest = SimpleIssueDereplicatorRequest(
collectedSyncInfo.neoOperations.findAll<GropiusUser>().filter { it.username == "gropius" }.firstOrNull()
?: collectedSyncInfo.neoOperations.save(
Expand All @@ -239,14 +239,15 @@ abstract class AbstractSync(
)
try {
findUnsyncedIssues(imsProject).forEach {
syncIncomingIssue(imsProject, it, dereplicatorRequest)
//} catch (e: SyncNotificator.NotificatedError) {
// syncNotificator.sendNotification(
// imsIssue, SyncNotificator.NotificationDummy(e)
// )
//} catch (e: Exception) {
// logger.warn("Error in issue sync", e)
//}
try {
syncIncomingIssue(imsProject, it, dereplicatorRequest)/*
} catch (e: SyncNotificator.NotificatedError) {
syncNotificator.sendNotification(
imsIssue, SyncNotificator.NotificationDummy(e)
)*/
} catch (e: Exception) {
logger.warn("Exception in issue sync", e)
}
}
} catch (e: SyncNotificator.NotificatedError) {
logger.warn("Error in IMSProject sync", e)
Expand Down
5 changes: 4 additions & 1 deletion sync/src/main/kotlin/gropius/sync/CursorResourceWalker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ abstract class CursorResourceWalker<BudgetUsageType, EstimatedBudgetUsageType, B
}
} else {
cursorResourceWalkerDataService.changePriority(
imsProject, resource, { it + resourceWalkerConfig.priorityIncrease }, resourceWalkerConfig.basePriority
imsProject,
resource,
{ it + resourceWalkerConfig.priorityIncrease + resourceWalkerConfig.priorityIncrease * Math.random() },
resourceWalkerConfig.basePriority
);
}
}
Expand Down
8 changes: 6 additions & 2 deletions sync/src/main/kotlin/gropius/sync/LoadBalancedDataFetcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ class LoadBalancedDataFetcher() : DataFetcher {
val walkers = walkerPairs.sortedBy { it.first }.map { it.second }
for ((walker, imsProject) in walkers) {
logger.info("Executing walker for ${imsProject.rawId!!}")
walker.process()
logger.info("Executed walker for ${imsProject.rawId!!}")
try {
walker.process()
logger.info("Executed walker for ${imsProject.rawId!!}")
} catch (e: Exception) {
logger.warn("Exception in walker for ${imsProject.rawId!!}", e)
}
}
}
}
7 changes: 6 additions & 1 deletion sync/src/main/kotlin/gropius/sync/TokenManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ interface BaseResponseType {
@Serializable
data class LinkImsUserQuery(val imsUserIds: List<String>)

/**
* Exception thrown when no valid token is available
*/
class NoTokenValidException : Exception()

/**
* Manager for token from login service
* @param neoOperations Reference for the spring instance of ReactiveNeo4jOperations
Expand Down Expand Up @@ -209,7 +214,7 @@ abstract class TokenManager<ResponseType : BaseResponseType>(
logger.trace("User $user does not allow sync from $owner")
}
}
TODO("Error Message for no working users")
throw NoTokenValidException()
}

/**
Expand Down