Skip to content

Commit

Permalink
wip: bq work draft
Browse files Browse the repository at this point in the history
  • Loading branch information
mrehan27 committed Nov 25, 2022
1 parent 0a7df14 commit bce0b18
Show file tree
Hide file tree
Showing 39 changed files with 1,485 additions and 7 deletions.
5 changes: 5 additions & 0 deletions shared/src/androidMain/kotlin/io/customer/shared/util/Util.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.customer.shared.util

import java.util.*

actual fun generateRandomUUID(): String = UUID.randomUUID().toString()
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.customer.shared.common

/**
* Wrapper around Kotlin object to initialize the instance later when accessing the object.
*
* The class currently does not support atomic operations and may initialize the object multiple
* times if accessed from different threads simultaneously.
*
* @param T generic type of object class.
*/
// TODO: Support atomic operations
internal class LazyReference<T : Any>(defaultValue: T? = null) {
// The referenced value, can be null
var value: T? = defaultValue
private set

/**
* Returns current instance if initialized, else assigns it with new value using initializer
* and then returns it.
*/
fun initializeAndGet(initializer: () -> T): T {
return value ?: initializer.invoke().also { value = it }
}

/**
* Clears currently set instance
*/
fun clearInstance() {
value = null
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.customer.shared.common

typealias CustomAttributesCompat = Map<String, Any>
typealias QueueTaskResult = Boolean

/**
* Code below this line is a temporary work around for the issue raised by Kotlin serialization.
Expand All @@ -16,7 +17,7 @@ typealias CustomAttributesCompat = Map<String, Any>
* typealias CustomAttributes = Map<String, Any>
* ************************************************
*/
// TODO: Add support for generic serialization
// TODO: Add support for generic and optional serialization
internal typealias CustomAttributes = Map<String, String>

internal fun CustomAttributesCompat.fix(): CustomAttributes {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.customer.shared.database

import com.squareup.sqldelight.ColumnAdapter
import kotlinx.datetime.Instant

/**
* This file holds [ColumnAdapter] implementations required to store different types in SQLDelight
* database.
*/

object DateTimeInstantAdapter : ColumnAdapter<Instant, Long> {
override fun decode(databaseValue: Long): Instant {
return Instant.fromEpochMilliseconds(databaseValue)
}

override fun encode(value: Instant): Long {
return value.toEpochMilliseconds()
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
package io.customer.shared.database

import io.customer.shared.local.CioDatabase
import com.squareup.sqldelight.EnumColumnAdapter
import local.TrackingTask

internal class DatabaseHelper(databaseDriverFactory: DatabaseDriverFactory) {
private val database = CioDatabase(
driver = databaseDriverFactory.createDriver(databaseName = DATABASE_NAME),
trackingTaskAdapter = TrackingTask.Adapter(
createdAtAdapter = DateTimeInstantAdapter,
updatedAtAdapter = DateTimeInstantAdapter,
expiresAtAdapter = DateTimeInstantAdapter,
stalesAtAdapter = DateTimeInstantAdapter,
identityTypeAdapter = EnumColumnAdapter(),
queueTaskStatusAdapter = EnumColumnAdapter(),
errorReasonAdapter = EnumColumnAdapter(),
),
)
private val dbQuery = database.cioDatabaseQueries
val trackingTaskQueries = database.trackingTaskQueries

companion object {
const val DATABASE_NAME = "customerio.db"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package io.customer.shared.database

import com.squareup.sqldelight.TransactionWithReturn
import io.customer.shared.common.QueueTaskResult
import io.customer.shared.sdk.config.BackgroundQueueConfig
import io.customer.shared.sdk.meta.Workspace
import io.customer.shared.tracking.constant.Priority
import io.customer.shared.tracking.constant.QueueTaskStatus
import io.customer.shared.tracking.model.*
import io.customer.shared.tracking.queue.TaskResultListener
import io.customer.shared.tracking.queue.failure
import io.customer.shared.tracking.queue.success
import io.customer.shared.util.*
import io.customer.shared.work.*
import local.TrackingTask
import local.TrackingTaskQueries

/**
* The class works as a bridge for SQL queries. All queries to database should be made using this
* class to keep some abstraction from the database layer.
*/
internal interface QueryHelper {
@MainDispatcher
fun insertTask(
task: Task,
listener: TaskResultListener<QueueTaskResult>? = null,
)

@BackgroundDispatcher
fun updateTasksStatus(
status: QueueTaskStatus,
tasks: List<TrackingTask>,
): QueueTaskResult

@BackgroundDispatcher
fun updateTasksResponseStatus(
responses: List<TaskResponse>,
): QueueTaskResult

@BackgroundDispatcher
fun selectAllPendingTasks(): List<TrackingTask>?

@MainDispatcher
fun clearAllExpiredTasks()
}

internal class QueryHelperImpl(
private val logger: Logger,
private val dateTimeUtil: DateTimeUtil,
private val jsonAdapter: JsonAdapter,
override val executor: CoroutineExecutor,
private val workspace: Workspace,
private val backgroundQueueConfig: BackgroundQueueConfig,
private val trackingTaskQueries: TrackingTaskQueries,
) : QueryHelper, CoroutineExecutable {
private val selectAllPendingQuery = trackingTaskQueries.selectAllPendingTasks(
status = listOf(QueueTaskStatus.PENDING, QueueTaskStatus.FAILED),
limit = backgroundQueueConfig.batchTasksMax.toLong(),
siteId = workspace.siteId,
)

@Throws(Exception::class)
@WithinTransaction
private fun Task.mergeWithSimilarPending(): Activity {
if (!activity.isUnique()) return activity

val newActivity = activity
val oldActivityTask = trackingTaskQueries.selectByType(
type = newActivity.type,
siteId = workspace.siteId,
).executeAsOneOrNull()

var mergedActivity: Activity? = null
if (oldActivityTask != null && oldActivityTask.queueTaskStatus != QueueTaskStatus.SENT) {
val result = kotlin.runCatching {
newActivity.merge(other = jsonAdapter.parseToActivity(oldActivityTask.activityJson))
}
result.fold(
onSuccess = { value -> mergedActivity = value },
onFailure = { ex ->
logger.fatal("Failed to parse activity ${oldActivityTask.type}, model version ${oldActivityTask.activityModelVersion}. Reason: ${ex.message}")
},
)
}
return mergedActivity ?: newActivity
}

@WithinTransaction
private fun updateStatusInternal(
status: QueueTaskStatus,
tasks: List<TrackingTask>,
): QueueTaskResult {
val taskIds = tasks.map { task -> task.id }
val result = kotlin.runCatching {
trackingTaskQueries.updateTasksStatus(
updatedAt = dateTimeUtil.now,
status = status,
ids = taskIds,
siteId = workspace.siteId,
)
}
result.onFailure { ex ->
logger.error("Unable to update status $status for tasks ${taskIds.joinToString(separator = ",")}. Reason: ${ex.message}")
}
return result.isSuccess
}

private fun <Result : Any?> runInTransaction(
block: TransactionWithReturn<Result>.() -> Result,
): Result {
return trackingTaskQueries.transactionWithResult(noEnclosing = false) { block() }
}

private fun <Result : Any> runInTransactionAsync(
block: TransactionWithReturn<Result>.() -> Result,
) = runSuspended { runInTransaction(block = block) }

@MainDispatcher
override fun insertTask(task: Task, listener: TaskResultListener<QueueTaskResult>?) {
runInTransactionAsync {
val activity = task.mergeWithSimilarPending()
val result = kotlin.runCatching {
val currentTime = dateTimeUtil.now
val json = jsonAdapter.parseToString(activity = activity)

trackingTaskQueries.insertOrReplaceTasks(
id = activity.generateUniqueID(),
siteId = workspace.siteId,
type = activity.type,
createdAt = currentTime,
updatedAt = currentTime,
expiresAt = null,
stalesAt = null,
identity = task.profileIdentifier,
identityType = workspace.identityType,
activityJson = json,
activityModelVersion = activity.modelVersion,
queueTaskStatus = QueueTaskStatus.PENDING,
priority = Priority.DEFAULT,
)
logger.debug("Adding task ${activity.type} to queue successful")

if (activity is Activity.IdentifyProfile) {
trackingTaskQueries.updateAllAnonymousTasks(
updatedAt = currentTime,
identifier = task.profileIdentifier,
identityType = workspace.identityType,
siteId = workspace.siteId,
)
logger.debug("Updating identifier with ${task.profileIdentifier} and identity type ${workspace.identityType}")
}
}
result.fold(
onSuccess = {
runSuspended { listener?.success() }
},
onFailure = { ex ->
logger.error("Unable to add $activity to queue, skipping task. Reason: ${ex.message}")
runSuspended { listener?.failure(exception = ex) }
},
)
}
}

@BackgroundDispatcher
override fun updateTasksStatus(
status: QueueTaskStatus,
tasks: List<TrackingTask>,
): QueueTaskResult = runInTransaction {
updateStatusInternal(status = status, tasks = tasks)
}

@BackgroundDispatcher
override fun updateTasksResponseStatus(
responses: List<TaskResponse>,
): QueueTaskResult = runInTransaction {
val result = kotlin.runCatching {
val updatedAtTime = dateTimeUtil.now
responses.forEach { response ->
trackingTaskQueries.updateFailedTaskStatus(
updatedAt = updatedAtTime,
status = response.taskStatus,
statusCode = response.statusCode,
errorReason = response.errorReason,
ids = listOf(response.id),
siteId = workspace.siteId,
)
}
}
result.onFailure { ex ->
logger.error("Unable to updated response status for ${responses.size} pending tasks due to: ${ex.message}")
}
return@runInTransaction result.isSuccess
}

@BackgroundDispatcher
override fun selectAllPendingTasks(): List<TrackingTask>? = runInTransaction {
val pendingTasks = selectAllPendingQuery.executeAsList()
val result = updateStatusInternal(
status = QueueTaskStatus.QUEUED,
tasks = pendingTasks,
)
return@runInTransaction pendingTasks.takeIf { result }
}

@MainDispatcher
override fun clearAllExpiredTasks() {
runInTransaction {
trackingTaskQueries.clearAllTasksWithStatus(
status = listOf(QueueTaskStatus.SENT, QueueTaskStatus.INVALID),
siteId = workspace.siteId,
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.customer.shared.database

/**
* Denotes that the annotated method should only be called on a background dispatcher.
* If the annotated element is a class, then all methods in the class should be called
* on background dispatcher.
*/
@Target(
AnnotationTarget.FUNCTION,
AnnotationTarget.PROPERTY_GETTER,
AnnotationTarget.PROPERTY_SETTER,
AnnotationTarget.CONSTRUCTOR,
AnnotationTarget.ANNOTATION_CLASS,
AnnotationTarget.CLASS,
AnnotationTarget.VALUE_PARAMETER,
)
@Retention(AnnotationRetention.SOURCE)
annotation class WithinTransaction
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.customer.shared.device

interface UserAgentStore {
/**
* buildUserAgent - To get `user-agent` header value. This value depends on SDK version
* and device detail such as OS version, device model, customer's app name etc
*
* If the device and OS information is available, it should return in following format :
* `Customer.io Android Client/1.0.0-alpha.6 (Google Pixel 6; 30) User App/1.0`
*
* Otherwise return SDK info only
* `Customer.io Android Client/1.0.0-alpha.6`
*/
fun buildUserAgent(): String
}
Loading

0 comments on commit bce0b18

Please sign in to comment.