Skip to content

Commit

Permalink
Buffer GzipStreamTransform input
Browse files Browse the repository at this point in the history
  • Loading branch information
pete-signal authored Oct 24, 2024
1 parent d9e4112 commit 08dd35e
Showing 1 changed file with 37 additions and 15 deletions.
52 changes: 37 additions & 15 deletions SignalServiceKit/Util/StreamTransform/GzipStreamTransform.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ public class GzipStreamTransform: StreamTransform, FinalizableStreamTransform {
}

private enum Constants {
static let BufferSize: Int = 32_768
static let BufferSize: Int = 65_536
static let MaxBufferSize: Int = BufferSize * 4

// Use the maximum memory window (32K) for compressing the data
static let MaxWindowBits = MAX_WBITS
Expand Down Expand Up @@ -83,14 +84,16 @@ public class GzipStreamTransform: StreamTransform, FinalizableStreamTransform {
try process(data: data, finalize: false)
}

private var buffer = Data(capacity: Constants.BufferSize)
private var currentOffset: UInt = 0
private var currentCapcity: Int = Constants.BufferSize

private func process(data: Data, finalize: Bool) throws -> Data {

let flags: Int32 = finalize ? Z_FINISH : Z_NO_FLUSH
var status: Int32 = Z_OK

var returnData = Data()
var buffer = Data(count: Constants.BufferSize)
var bufferWritten: UInt = 0
let startOffset = currentOffset

data.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) in

Expand All @@ -101,17 +104,24 @@ public class GzipStreamTransform: StreamTransform, FinalizableStreamTransform {
// Set stream.avail_in to the size of the remaining passed in data
stream.avail_in = UInt32(clamping: data.count)

// If avail_out is less than the size of the input, pre-emptively rewind to the beginning

repeat {
// From zlib docs:
// "If inflate (or deflate) returns Z_OK and with zero avail_out, it must be called again
// after making room in the output buffer because there might be more output pending."
//
// If this is encountered, move the current buffer into `returnData` and reset to an empty buffer
if stream.avail_out == 0 {
returnData.append(buffer)
buffer = Data(count: Constants.BufferSize)
bufferWritten = 0
stream.avail_out = UInt32(Constants.BufferSize)
if buffer.count >= Constants.MaxBufferSize {
// reset to beginning of array
currentOffset = 0
} else {
currentCapcity += Constants.BufferSize
buffer.reserveCapacity(currentCapcity)
// currentOffset can remain the same
}
stream.avail_out = UInt32(UInt(currentCapcity) - currentOffset)
}

buffer.withUnsafeMutableBytes { (outputPtr: UnsafeMutableRawBufferPointer) in
Expand All @@ -121,7 +131,7 @@ public class GzipStreamTransform: StreamTransform, FinalizableStreamTransform {
// inflate/deflate from returning without having processed the entire input.
// If this happens, and `avail_out` > 0, we should attempt to append to the output
// buffer on subsequent calls into inflate/deflate
stream.next_out = outputPtr.bindMemory(to: Bytef.self).baseAddress!.advanced(by: Int(clamping: bufferWritten))
stream.next_out = outputPtr.bindMemory(to: Bytef.self).baseAddress!.advanced(by: Int(clamping: currentOffset))

switch operation {
case .compress:
Expand All @@ -131,7 +141,7 @@ public class GzipStreamTransform: StreamTransform, FinalizableStreamTransform {
}

// stream.avail_out should never be greater than Constants.BufferSize, but clamp just to be sure.
bufferWritten = UInt(clamping: Constants.BufferSize - Int(stream.avail_out))
currentOffset = UInt(clamping: currentCapcity - Int(stream.avail_out))
stream.next_out = nil
}

Expand Down Expand Up @@ -172,11 +182,23 @@ public class GzipStreamTransform: StreamTransform, FinalizableStreamTransform {
throw GzipError.transformFailed
}

// Append the remaining buffer to the return data and reset the stream field.
returnData.append(buffer.subdata(in: 0..<Int(clamping: bufferWritten)))
buffer = Data(count: Constants.BufferSize)
bufferWritten = 0
stream.avail_out = UInt32(Constants.BufferSize)
let returnData = buffer.withUnsafeMutableBytes { (bufferPtr: UnsafeMutableRawBufferPointer) in
if currentOffset >= startOffset {
// Happy path - send back a no-copy reference to the buffer
let shiftedPtr = bufferPtr.bindMemory(to: Bytef.self).baseAddress!.advanced(by: Int(clamping: startOffset))
return Data(bytesNoCopy: shiftedPtr, count: Int(currentOffset - startOffset), deallocator: .none)
} else {
// Looped around to the beginning of the buffer.
// crete data and append
let basePtr = bufferPtr.bindMemory(to: Bytef.self).baseAddress!
let shiftedPtr = basePtr.advanced(by: Int(clamping: startOffset))

let firstPart = Data(bytesNoCopy: shiftedPtr, count: buffer.count - Int(startOffset), deallocator: .none)
let secondPart = Data(bytesNoCopy: basePtr, count: Int(currentOffset), deallocator: .none)

return firstPart + secondPart
}
}

outputCount += returnData.count
return returnData
Expand Down

0 comments on commit 08dd35e

Please sign in to comment.