Skip to content

Commit

Permalink
fix retry read buffer problem
Browse files Browse the repository at this point in the history
  • Loading branch information
gnehil committed Nov 14, 2023
1 parent de60f63 commit ce1a02f
Showing 1 changed file with 21 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,10 @@ class DorisWriter(settings: SparkSettings, preCommittedTxnAcc: CollectionAccumul
override def next(): T = {
recordCount += 1
if (batchRetryEnable) {
if (isReset && buffer.nonEmpty) {
buffer(recordCount)
if (isReset) {
readBuffer()
} else {
val elem = iterator.next
buffer += elem
elem
writeBufferAndReturn()
}
} else {
iterator.next
Expand All @@ -172,8 +170,10 @@ class DorisWriter(settings: SparkSettings, preCommittedTxnAcc: CollectionAccumul
*/
def reset(): Unit = {
recordCount = 0
isReset = true
logger.info("batch iterator is reset")
isReset = buffer.nonEmpty
if (isReset) {
logger.info("buffer is not empty and batch iterator is reset")
}
}

/**
Expand All @@ -186,6 +186,20 @@ class DorisWriter(settings: SparkSettings, preCommittedTxnAcc: CollectionAccumul
}
}

private def readBuffer(): T = {
if (recordCount == buffer.size) {
logger.debug("read buffer end, recordCount:{}, bufferSize: {}", recordCount, buffer.size)
isReset = false
}
buffer(recordCount - 1)
}

private def writeBufferAndReturn(): T = {
val elem = iterator.next
buffer += elem
elem
}

}


Expand Down

0 comments on commit ce1a02f

Please sign in to comment.