From 6f55916a2feb14939d79be39aa597e908cd224d3 Mon Sep 17 00:00:00 2001 From: John Cupitt Date: Fri, 6 Dec 2019 13:28:08 +0000 Subject: [PATCH] add IO::-style user streams a thin layer over signal_connect, thanks @janko see https://github.com/janko/image_processing/issues/60#issuecomment-562231721 --- example/stream.rb | 94 ++++++------------------------------------ lib/vips/streamiu.rb | 23 +++++++++++ lib/vips/streamou.rb | 17 ++++++++ spec/stream_spec.rb | 98 ++++++++++---------------------------------- 4 files changed, 73 insertions(+), 159 deletions(-) diff --git a/example/stream.rb b/example/stream.rb index d3097800..f25e99ac 100755 --- a/example/stream.rb +++ b/example/stream.rb @@ -2,85 +2,15 @@ require 'vips' -class Mystreami < Vips::Streamiu - def initialize(filename, pipe_mode=false) - @filename = filename - @contents = File.open(@filename, "rb").read - @length = @contents.length - @pipe_mode = pipe_mode - @read_point = 0 - - puts "Mystreami: loaded #{@length} bytes" - - super() - - signal_connect "read" do |buf, len| - puts "read: #{len} bytes from #{@read_point}" - - bytes_available = @length - @read_point - bytes_to_copy = [bytes_available, len].min - buf.put_bytes(0, @contents, @read_point, bytes_to_copy) - @read_point += bytes_to_copy - - puts " transferred #{bytes_to_copy} bytes" - - bytes_to_copy - end - - signal_connect "seek" do |offset, whence| - puts "seek: offset #{offset}, whence #{whence}" - if @pipe_mode - puts " new_position == -1 (pipe mode)" - return -1 - end - - case whence - when 0 - # SEEK_SET - new_read_point = offset - when 1 - # SEEK_CUR - new_read_point = self.read_point + offset - when 2 - # SEEK_END - new_read_point = self.length + offset - else - raise "bad whence #{whence}" - end - - @read_point = [0, [@length, new_read_point].min].max - - puts " new_position = #{@read_point}" - - @read_point - end - end -end - -class Mystreamo < Vips::Streamou - def initialize(filename) - @filename = filename - @f = File.open(@filename, "wb") - - super() - - signal_connect "write" do |buf, len| - @f.write(buf.get_bytes(0, len)) - len - end - - signal_connect "finish" do - @f.close - @f = nil - end - - end -end - -streamiu = Mystreami.new ARGV[0] -image = Vips::Image.new_from_stream streamiu, "", access: "sequential" -#puts "avg = #{image.avg}" - -streamio = Mystreamo.new ARGV[1] -image.write_to_stream streamio, ".png" - +source = File.open ARGV[0], "rb" +input_stream = Vips::Streamiu.new +input_stream.on_read { |length| source.read length } +input_stream.on_seek { |offset, whence| source.seek(offset, whence) } + +dest = File.open ARGV[1], "w" +output_stream = Vips::Streamou.new +output_stream.on_write { |chunk| dest.write(chunk) } +output_stream.on_finish { dest.close } + +image = Vips::Image.new_from_stream input_stream, "", access: "sequential" +image.write_to_stream output_stream, ".png" diff --git a/lib/vips/streamiu.rb b/lib/vips/streamiu.rb index 8416294f..e957472a 100644 --- a/lib/vips/streamiu.rb +++ b/lib/vips/streamiu.rb @@ -46,5 +46,28 @@ def initialize super pointer end + # The block is executed to read data from the source. The interface is + # exactly as IO::seek, ie. it takes a maximum number of bytes to read and + # returns a string of bytes from the source, or nil if the source is already + # at eod of file. + def on_read &block + signal_connect "read" do |buf, len| + chunk = block.call len + return 0 if chunk == nil + + buf.put_bytes(0, chunk, 0, chunk.bytesize) + chunk.bytesize + end + end + + # The block is executed to seek the source. The interface is exactly as + # IO::seek, ie. it should take an offset and whence, and return the new read + # position. + def on_seek &block + signal_connect "seek" do |offset, whence| + block.call offset, whence + end + end + end end diff --git a/lib/vips/streamou.rb b/lib/vips/streamou.rb index b5ddd555..128d9876 100644 --- a/lib/vips/streamou.rb +++ b/lib/vips/streamou.rb @@ -46,5 +46,22 @@ def initialize super pointer end + # The block is executed to write data to the source. The interface is + # exactly as IO::write, ie. it should write the string and return the + # number of bytes written. + def on_write &block + signal_connect "write" do |p, len| + block.call(p.get_bytes(0, len)) + end + end + + # The block is executed at the end of write. It should do any necessary + # finishing action. + def on_finish &block + signal_connect "finish" do + block.call() + end + end + end end diff --git a/spec/stream_spec.rb b/spec/stream_spec.rb index 5fdc1290..8126084f 100644 --- a/spec/stream_spec.rb +++ b/spec/stream_spec.rb @@ -107,82 +107,20 @@ end end -if Vips::at_least_libvips?(8, 9) - class Mystreami < Vips::Streamiu - def initialize(filename, pipe_mode=false) - @filename = filename - @contents = File.open(@filename, "rb").read - @length = @contents.length - @pipe_mode = pipe_mode - @read_point = 0 - - super() - - signal_connect "read" do |buf, len| - bytes_available = @length - @read_point - bytes_to_copy = [bytes_available, len].min - buf.put_bytes(0, @contents, @read_point, bytes_to_copy) - @read_point += bytes_to_copy - - bytes_to_copy - end - - signal_connect "seek" do |offset, whence| - if @pipe_mode - -1 - else - case whence - when 0 - # SEEK_SET - new_read_point = offset - when 1 - # SEEK_CUR - new_read_point = self.read_point + offset - when 2 - # SEEK_END - new_read_point = self.length + offset - else - raise "bad whence #{whence}" - end - - @read_point = [0, [@length, new_read_point].min].max - end - end - end - end - - class Mystreamo < Vips::Streamou - def initialize(filename) - @filename = filename - @f = File.open(@filename, "wb") - - super() - - signal_connect "write" do |buf, len| - @f.write(buf.get_bytes(0, len)) - len - end - - signal_connect "finish" do - @f.close - @f = nil - end - - end - end -end - if Vips::at_least_libvips?(8, 9) RSpec.describe Vips::Streamiu do it 'can create a user input stream' do - streamiu = Mystreami.new simg('wagon.jpg') + input_stream = Vips::Streamiu.new - expect(streamiu) + expect(input_stream) end it 'can load a user stream' do - streamiu = Mystreami.new simg('wagon.jpg') - image = Vips::Image.new_from_stream streamiu, '' + source = File.open simg('wagon.jpg'), "rb" + input_stream = Vips::Streamiu.new + input_stream.on_read { |length| source.read length } + input_stream.on_seek { |offset, whence| source.seek(offset, whence) } + image = Vips::Image.new_from_stream input_stream, "" expect(image) expect(image.width).to eq(685) @@ -191,9 +129,12 @@ def initialize(filename) expect(image.avg).to be_within(0.001).of(109.789) end - it 'can load a user stream in pipe mode' do - streamiu = Mystreami.new simg('wagon.jpg'), true - image = Vips::Image.new_from_stream streamiu, '' + it 'on_seek is optional' do + source = File.open simg('wagon.jpg'), "rb" + input_stream = Vips::Streamiu.new + input_stream.on_read { |length| source.read length } + input_stream.on_seek { |offset, whence| -1 } + image = Vips::Image.new_from_stream input_stream, "" expect(image) expect(image.width).to eq(685) @@ -203,16 +144,19 @@ def initialize(filename) end it 'can create a user output stream' do - streamou = Mystreamo.new timg('x.jpg') + output_stream = Vips::Streamou.new - expect(streamou) + expect(output_stream) end it 'can write an image to a user output stream' do - image = Vips::Image.new_from_file simg('wagon.jpg') filename = timg('x5.png') - streamou = Mystreamo.new filename - image.write_to_stream streamou, '.png' + dest = File.open filename, "wb" + output_stream = Vips::Streamou.new + output_stream.on_write { |chunk| dest.write(chunk) } + output_stream.on_finish { dest.close } + image = Vips::Image.new_from_file simg('wagon.jpg') + image.write_to_stream output_stream, ".png" image = Vips::Image.new_from_file filename expect(image)