Skip to content

Commit

Permalink
doc: add doc for ChunkRead and ParallelChunkRead
Browse files Browse the repository at this point in the history
  • Loading branch information
duke-git committed Mar 5, 2024
1 parent 9bfdc68 commit c58c503
Show file tree
Hide file tree
Showing 8 changed files with 386 additions and 25 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,10 @@ import "github.com/duke-git/lancet/v2/fileutil"
[[play](https://go.dev/play/p/GhLS6d8lH_g)]
- **<big>ReadFile</big>** : read file or url.
[[doc](https://github.com/duke-git/lancet/blob/main/docs/en/api/packages/fileutil.md#ReadFile)]
- **<big>ChunkRead</big>** : reads a block from the file at the specified offset and returns all lines within the block.
[[doc](https://github.com/duke-git/lancet/blob/main/docs/en/api/packages/fileutil.md#ChunkRead)]
- **<big>ParallelChunkRead</big>** : reads the file in parallel and send each chunk of lines to the specified channel.
[[doc](https://github.com/duke-git/lancet/blob/main/docs/en/api/packages/fileutil.md#ParallelChunkRead)]

<h3 id="formatter"> 10. Formatter contains some functions for data formatting. &nbsp; &nbsp; &nbsp; &nbsp;<a href="#index">index</a></h3>

Expand Down
6 changes: 6 additions & 0 deletions README_zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,12 @@ import "github.com/duke-git/lancet/v2/fileutil"
[[play](https://go.dev/play/p/GhLS6d8lH_g)]
- **<big>ReadFile</big>** : 读取文件或者URL。
[[doc](https://github.com/duke-git/lancet/blob/main/docs/api/packages/fileutil.md#ReadFile)]
- **<big>ChunkRead</big>** : 从文件的指定偏移读取块并返回块内所有行。
[[doc](https://github.com/duke-git/lancet/blob/main/docs/api/packages/fileutil.md#ChunkRead)]
- **<big>ParallelChunkRead</big>** : 并行读取文件并将每个块的行发送到指定通道。
[[doc](https://github.com/duke-git/lancet/blob/main/docs/api/packages/fileutil.md#ParallelChunkRead)]



<h3 id="formatter"> 10. formatter 格式化器包含一些数据格式化处理方法。&nbsp; &nbsp; &nbsp; &nbsp;<a href="#index">回到目录</a></h3>

Expand Down
116 changes: 116 additions & 0 deletions docs/api/packages/fileutil.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ import (
- [WriteStringToFile](#WriteStringToFile)
- [WriteBytesToFile](#WriteBytesToFile)
- [ReadFile](#ReadFile)
- [ChunkRead](#ChunkRead)
- [ParallelChunkRead](#ParallelChunkRead)

<div STYLE="page-break-after: always;"></div>

Expand Down Expand Up @@ -955,9 +957,123 @@ func main() {
if err != nil {
return
}

fmt.Println(string(dat))

// Output:
// User-agent: *
// Disallow: /deny
}
```

### <span id="ChunkRead">ChunkRead</span>

<p>从文件的指定偏移读取块并返回块内所有行。</p>

<b>函数签名:</b>

```go
func ChunkRead(file *os.File, offset int64, size int, bufPool *sync.Pool) ([]string, error)
```

<b>示例:</b>

```go
package main

import (
"fmt"
"github.com/duke-git/lancet/v2/fileutil"
)

func main() {
const mb = 1024 * 1024
const defaultChunkSizeMB = 100

// test1.csv file content:
// Lili,22,female
// Jim,21,male
filePath := "./testdata/test1.csv" // 替换为你的文件路径
f, err := os.Open(filePath)
if err != nil {
return
}

defer f.Close()

var bufPool = sync.Pool{
New: func() interface{} {
return make([]byte, 0, defaultChunkSizeMB*mb)
},
}

lines, err := fileutil.ChunkRead(f, 0, 100, &bufPool)
if err != nil {
return
}

fmt.Println(lines[0])
fmt.Println(lines[1])

// Output:
// Lili,22,female
// Jim,21,male
}
```

### <span id="ParallelChunkRead">ParallelChunkRead</span>

<p>并行读取文件并将每个块的行发送到指定通道。</p>

<b>函数签名:</b>

```go
// filePath:文件路径
// chunkSizeMB: 分块的大小(单位MB,设置为0时使用默认100MB),设置过大反而不利,视情调整
// maxGoroutine: 并发读取分块的数量,设置为0时使用CPU核心数
// linesCh: 用于接收返回结果的通道。
func ParallelChunkRead(filePath string, linesCh chan<- []string, chunkSizeMB, maxGoroutine int) error
```

<b>示例:</b>

```go
package main

import (
"fmt"
"github.com/duke-git/lancet/v2/fileutil"
)

func main() {
const mb = 1024 * 1024
const defaultChunkSizeMB = 100 // 默认值

numParsers := runtime.NumCPU()

linesCh := make(chan []string, numParsers)

// test1.csv file content:
// Lili,22,female
// Jim,21,male
filePath := "./testdata/test1.csv"

go fileutil.ParallelChunkRead(filePath, linesCh, defaultChunkSizeMB, numParsers)

var totalLines int
for lines := range linesCh {
totalLines += len(lines)

for _, line := range lines {
fmt.Println(line)
}
}

fmt.Println(totalLines)

// Output:
// Lili,22,female
// Jim,21,male
// 2
}
```
114 changes: 114 additions & 0 deletions docs/en/api/packages/fileutil.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ import (
- [WriteStringToFile](#WriteStringToFile)
- [WriteBytesToFile](#WriteBytesToFile)
- [ReadFile](#ReadFile)
- [ChunkRead](#ChunkRead)
- [ParallelChunkRead](#ParallelChunkRead)

<div STYLE="page-break-after: always;"></div>

Expand Down Expand Up @@ -961,3 +963,115 @@ func main() {
// Disallow: /deny
}
```

### <span id="ChunkRead">ChunkRead</span>

<p>reads a block from the file at the specified offset and returns all lines within the block.</p>

<b>Signature :</b>

```go
func ChunkRead(file *os.File, offset int64, size int, bufPool *sync.Pool) ([]string, error)
```

<b>Example:</b>

```go
package main

import (
"fmt"
"github.com/duke-git/lancet/v2/fileutil"
)

func main() {
const mb = 1024 * 1024
const defaultChunkSizeMB = 100

// test1.csv file content:
// Lili,22,female
// Jim,21,male
filePath := "./testdata/test1.csv"
f, err := os.Open(filePath)
if err != nil {
return
}

defer f.Close()

var bufPool = sync.Pool{
New: func() interface{} {
return make([]byte, 0, defaultChunkSizeMB*mb)
},
}

lines, err := fileutil.ChunkRead(f, 0, 100, &bufPool)
if err != nil {
return
}

fmt.Println(lines[0])
fmt.Println(lines[1])

// Output:
// Lili,22,female
// Jim,21,male
}
```

### <span id="ParallelChunkRead">ParallelChunkRead</span>

<p>Reads the file in parallel and send each chunk of lines to the specified channel.</p>

<b>Signature :</b>

```go
// filePath: file path.
// chunkSizeMB: The size of the block (in MB, the default is 100MB when set to 0). Setting it too large will be detrimental. Adjust it as appropriate.
// maxGoroutine: The number of concurrent read chunks, the number of CPU cores used when set to 0.
// linesCh: The channel used to receive the returned results.
func ParallelChunkRead(filePath string, linesCh chan<- []string, chunkSizeMB, maxGoroutine int) error
```

<b>Example:</b>

```go
package main

import (
"fmt"
"github.com/duke-git/lancet/v2/fileutil"
)

func main() {
const mb = 1024 * 1024
const defaultChunkSizeMB = 100 // 默认值

numParsers := runtime.NumCPU()

linesCh := make(chan []string, numParsers)

// test1.csv file content:
// Lili,22,female
// Jim,21,male
filePath := "./testdata/test1.csv"

go fileutil.ParallelChunkRead(filePath, linesCh, defaultChunkSizeMB, numParsers)

var totalLines int
for lines := range linesCh {
totalLines += len(lines)

for _, line := range lines {
fmt.Println(line)
}
}

fmt.Println(totalLines)

// Output:
// Lili,22,female
// Jim,21,male
// 2
}
```
52 changes: 30 additions & 22 deletions fileutil/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"fmt"
"io"
"io/fs"
"log"
"net/http"
"os"
"path/filepath"
Expand Down Expand Up @@ -869,12 +868,13 @@ func isCsvSupportedType(v interface{}) bool {
}
}

// ChunkRead 从文件的指定偏移读取块并返回块内所有行
func ChunkRead(f *os.File, offset int64, size int, bufPool *sync.Pool) []string {
// ChunkRead reads a block from the file at the specified offset and returns all lines within the block
// Play: todo
func ChunkRead(file *os.File, offset int64, size int, bufPool *sync.Pool) ([]string, error) {
buf := bufPool.Get().([]byte)[:size] // 从Pool获取缓冲区并调整大小
n, err := f.ReadAt(buf, offset) // 从指定偏移读取数据到缓冲区
n, err := file.ReadAt(buf, offset) // 从指定偏移读取数据到缓冲区
if err != nil && err != io.EOF {
log.Fatal(err)
return nil, err
}
buf = buf[:n] // 调整切片以匹配实际读取的字节数

Expand All @@ -893,58 +893,64 @@ func ChunkRead(f *os.File, offset int64, size int, bufPool *sync.Pool) []string
lines = append(lines, line)
}
bufPool.Put(buf) // 读取完成后,将缓冲区放回Pool
return lines
return lines, nil
}

// 并行读取文件并将每个块的行发送到指定通道
// ParallelChunkRead reads the file in parallel and send each chunk of lines to the specified channel.
// filePath 文件路径
// ChunkSizeMB 分块的大小(单位MB,设置为0时使用默认100MB),设置过大反而不利,视情调整
// MaxGoroutine 并发读取分块的数量,设置为0时使用CPU核心数
// chunkSizeMB 分块的大小(单位MB,设置为0时使用默认100MB),设置过大反而不利,视情调整
// maxGoroutine 并发读取分块的数量,设置为0时使用CPU核心数
// linesCh用于接收返回结果的通道。
func ParallelChunkRead(filePath string, linesCh chan<- []string, ChunkSizeMB, MaxGoroutine int) {
if ChunkSizeMB == 0 {
ChunkSizeMB = 100
// Play: todo
func ParallelChunkRead(filePath string, linesCh chan<- []string, chunkSizeMB, maxGoroutine int) error {
if chunkSizeMB == 0 {
chunkSizeMB = 100
}
ChunkSize := ChunkSizeMB * 1024 * 1024
chunkSize := chunkSizeMB * 1024 * 1024
// 内存复用
bufPool := sync.Pool{
New: func() interface{} {
return make([]byte, 0, ChunkSize)
return make([]byte, 0, chunkSize)
},
}

if MaxGoroutine == 0 {
MaxGoroutine = runtime.NumCPU() // 设置为0时使用CPU核心数
if maxGoroutine == 0 {
maxGoroutine = runtime.NumCPU() // 设置为0时使用CPU核心数
}

f, err := os.Open(filePath)
if err != nil {
log.Fatalf("failed to open file: %v", err)
return err
}

defer f.Close()

info, err := f.Stat()
if err != nil {
log.Fatalf("failed to get file info: %v", err)
return err
}

wg := sync.WaitGroup{}
chunkOffsetCh := make(chan int64, MaxGoroutine)
chunkOffsetCh := make(chan int64, maxGoroutine)

// 分配工作
go func() {
for i := int64(0); i < info.Size(); i += int64(ChunkSize) {
for i := int64(0); i < info.Size(); i += int64(chunkSize) {
chunkOffsetCh <- i
}
close(chunkOffsetCh)
}()

// 启动工作协程
for i := 0; i < MaxGoroutine; i++ {
for i := 0; i < maxGoroutine; i++ {
wg.Add(1)
go func() {
for chunkOffset := range chunkOffsetCh {
linesCh <- ChunkRead(f, chunkOffset, ChunkSize, &bufPool)
chunk, err := ChunkRead(f, chunkOffset, chunkSize, &bufPool)
if err == nil {
linesCh <- chunk
}

}
wg.Done()
}()
Expand All @@ -953,4 +959,6 @@ func ParallelChunkRead(filePath string, linesCh chan<- []string, ChunkSizeMB, Ma
// 等待所有解析完成后关闭行通道
wg.Wait()
close(linesCh)

return nil
}
Loading

0 comments on commit c58c503

Please sign in to comment.