Skip to content

Commit

Permalink
use a context for cancelation
Browse files Browse the repository at this point in the history
  • Loading branch information
jbardin committed Jan 28, 2018
1 parent e71e76a commit 89aff3b
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 12 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[![GoDoc](https://godoc.org/github.com/jbardin/readchan?status.svg)](https://godoc.org/github.com/jbardin/readchan)

Package readchan provides methods for interating over an `io.Reader` by block
Package readchan provides methods for iterating over an `io.Reader` by block
or line, and reading the results via a channel.


Expand All @@ -11,7 +11,7 @@ or line, and reading the results via a channel.
log.Fatal(err)
}

for line := range readchan.Lines(f, 1, nil) {
for line := range readchan.Lines(context.TODO(), f, 1) {
if line.Err != nil {
log.Fatal(err)
}
Expand Down
13 changes: 7 additions & 6 deletions readchan.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package readchan

import (
"bufio"
"context"
"io"
"sync"
)
Expand Down Expand Up @@ -31,9 +32,9 @@ func (c *Chunk) Done() {
//
// The maxSize argument sets the allocated capacity of each []byte. Reads will
// buffer readAhead number of Chunks in the channel as soon as they are
// available. Closing the cancel channel will cause Reads loop to return, but
// available. Canceling the context will cause the Reads loop to return, but
// it cannot interrupt pending Read calls on r.
func Reads(r io.Reader, maxSize, readAhead int, cancel chan bool) <-chan *Chunk {
func Reads(ctx context.Context, r io.Reader, maxSize, readAhead int) <-chan *Chunk {
if maxSize <= 0 {
panic("invalid max buffer size")
}
Expand Down Expand Up @@ -69,7 +70,7 @@ func Reads(r io.Reader, maxSize, readAhead int, cancel chan bool) <-chan *Chunk

select {
case readChan <- chunk:
case <-cancel:
case <-ctx.Done():
return
}

Expand All @@ -87,10 +88,10 @@ func Reads(r io.Reader, maxSize, readAhead int, cancel chan bool) <-chan *Chunk
// characters.
//
// The readAhead argument determines the buffer size for the channel, which
// will be filled as soon as data available. Closing the cancel channel will
// will be filled as soon as data available. Canceling the context will
// cause the Lines scanner loop to return, but it cannot interrupt pending Read
// calls on r.
func Lines(r io.Reader, readAhead int, cancel chan bool) <-chan *Chunk {
func Lines(ctx context.Context, r io.Reader, readAhead int) <-chan *Chunk {
if readAhead < 0 {
readAhead = 1
}
Expand Down Expand Up @@ -119,7 +120,7 @@ func Lines(r io.Reader, readAhead int, cancel chan bool) <-chan *Chunk {

select {
case readChan <- chunk:
case <-cancel:
case <-ctx.Done():
return
}
}
Expand Down
9 changes: 5 additions & 4 deletions readchan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package readchan
import (
"bytes"
"compress/gzip"
"context"
"io"
"io/ioutil"
"log"
Expand Down Expand Up @@ -43,7 +44,7 @@ func TestChunkReader(t *testing.T) {

var newData []byte

readChan := Reads(testReader, 1024, 1, nil)
readChan := Reads(context.TODO(), testReader, 1024, 1)
for chunk := range readChan {
newData = append(newData, chunk.Data...)
chunk.Done()
Expand All @@ -64,7 +65,7 @@ func TestLineReader(t *testing.T) {
var newData []byte

lines := 0
readChan := Lines(testReader, 1, nil)
readChan := Lines(context.TODO(), testReader, 1)
for chunk := range readChan {
newData = append(newData, chunk.Data...)
newData = append(newData, '\n')
Expand Down Expand Up @@ -94,7 +95,7 @@ func BenchmarkChunkReader(b *testing.B) {
for i := 0; i < b.N; i++ {
newData = newData[:0]
testReader.Seek(0, 0)
readChan := Reads(testReader, 1024, 1, nil)
readChan := Reads(context.TODO(), testReader, 1024, 1)

for chunk := range readChan {
newData = append(newData, chunk.Data...)
Expand All @@ -116,7 +117,7 @@ func BenchmarkLineReader(b *testing.B) {
for i := 0; i < b.N; i++ {
newData = newData[:0]
testReader.Seek(0, 0)
readChan := Lines(testReader, 1, nil)
readChan := Lines(context.TODO(), testReader, 1)
for chunk := range readChan {
newData = append(newData, chunk.Data...)
chunk.Done()
Expand Down

0 comments on commit 89aff3b

Please sign in to comment.