Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/libdocker: buffered capture of client logs #735

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 101 additions & 40 deletions internal/libdocker/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ func (b *ContainerBackend) CreateContainer(ctx context.Context, imageName string
// but it's probably best to give Docker the info as early as possible.
createOpts.Config.AttachStdout = true
}
if opt.LogFile != "" {
createOpts.HostConfig = &docker.HostConfig{
LogConfig: docker.LogConfig{
Config: map[string]string{
"mode": "non-blocking",
"max-buffer-size": "4m",
},
},
}
}

c, err := b.client.CreateContainer(createOpts)
if err != nil {
Expand Down Expand Up @@ -331,49 +341,46 @@ func (b *ContainerBackend) uploadFiles(ctx context.Context, id string, files map
// starts executing the container and returns the CloseWaiter to allow the caller
// to wait for termination.
func (b *ContainerBackend) runContainer(ctx context.Context, logger log15.Logger, id string, opts libhive.ContainerOptions) (docker.CloseWaiter, error) {
var (
outStream io.Writer
errStream io.Writer
closer = newFileCloser(logger)
)
logger.Debug("starting container")
if err := b.client.StartContainerWithContext(id, nil, ctx); err != nil {
logger.Error("failed to start container", "err", err)
return nil, err
}

var closer *fileCloser
var err error
switch {
case opts.Output != nil && opts.LogFile != "":
return nil, fmt.Errorf("can't use LogFile and Output options at the same time")

case opts.Output != nil:
outStream = opts.Output
closer.addFile(opts.Output)

// If console logging is requested, dump stderr there.
if b.config.ContainerOutput != nil {
prefixer := newLinePrefixWriter(b.config.ContainerOutput, fmt.Sprintf("[%s] ", id[:8]))
closer.addFile(prefixer)
errStream = prefixer

// outStream = io.MultiWriter(outStream, prefixer)
}

closer, err = b.attachIO(ctx, logger, id, opts)
case opts.LogFile != "":
// Redirect container output to logfile.
if err := os.MkdirAll(filepath.Dir(opts.LogFile), 0755); err != nil {
return nil, err
}
log, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_CREATE|os.O_SYNC|os.O_TRUNC, 0644)
if err != nil {
return nil, err
}
closer.addFile(log)
outStream = log

// If console logging was requested, tee the output and tag it with the container id.
if b.config.ContainerOutput != nil {
prefixer := newLinePrefixWriter(b.config.ContainerOutput, fmt.Sprintf("[%s] ", id[:8]))
closer.addFile(prefixer)
outStream = io.MultiWriter(log, prefixer)
closer, err = b.attachLogs(ctx, logger, id, opts)
}
if err != nil {
if closer != nil {
closer.Close()
}
// In LogFile mode, stderr is redirected to stdout.
errStream = outStream
return nil, err
}
return closer, nil
}

func (b *ContainerBackend) attachIO(ctx context.Context, logger log15.Logger, id string, opts libhive.ContainerOptions) (*fileCloser, error) {
var (
outStream io.Writer
errStream io.Writer
closer = newFileCloser(logger)
)
closer.addFile(opts.Output)
outStream = opts.Output

// If console logging is requested, dump stderr there.
if b.config.ContainerOutput != nil {
prefixer := newLinePrefixWriter(b.config.ContainerOutput, fmt.Sprintf("[%s] ", id[:8]))
closer.addFile(prefixer)
errStream = prefixer
// outStream = io.MultiWriter(outStream, prefixer)
}

// Configure the streams and attach.
Expand Down Expand Up @@ -401,16 +408,67 @@ func (b *ContainerBackend) runContainer(ctx context.Context, logger log15.Logger
return nil, err
}
closer.w = waiter
return closer, nil
}

logger.Debug("starting container")
if err := b.client.StartContainerWithContext(id, nil, ctx); err != nil {
closer.Close()
logger.Error("failed to start container", "err", err)
func (b *ContainerBackend) attachLogs(ctx context.Context, logger log15.Logger, id string, opts libhive.ContainerOptions) (*fileCloser, error) {
if opts.Input != nil {
return nil, fmt.Errorf("container input is not supported in logFile mode")
}
if err := os.MkdirAll(filepath.Dir(opts.LogFile), 0755); err != nil {
return nil, err
}
log, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return nil, err
}

closer := new(fileCloser)
closer.addFile(log)

logopt := docker.LogsOptions{
Container: id,
OutputStream: log,
ErrorStream: log,
Follow: true,
Stdout: true,
Stderr: true,
// Timestamps: true,
}
var wg wgCloseWaiter
wg.Add(1)
go func() {
logger.Debug("reading container logs")
defer wg.Done()
err := b.client.Logs(logopt)
if err != nil {
wg.err = err
logger.Error("container log stream failed", "err", err)
return
}
}()
closer.w = &wg
return closer, nil
}

type wgCloseWaiter struct {
sync.WaitGroup
err error
}

func (w *wgCloseWaiter) Wait() error {
w.WaitGroup.Wait()
return w.err
}

func (w *wgCloseWaiter) Close() error {
return nil
}

type flusher interface {
Flush() error
}

// fileCloser wraps a docker.CloseWaiter and closes all io.Closer instances held in it,
// after it is done waiting.
type fileCloser struct {
Expand Down Expand Up @@ -443,6 +501,9 @@ func (w *fileCloser) addFile(c io.Closer) {
func (w *fileCloser) closeFiles() {
w.closeOnce.Do(func() {
for _, closer := range w.closers {
if f, ok := closer.(flusher); ok {
f.Flush()
}
if err := closer.Close(); err != nil {
w.logger.Error("failed to close fd", "err", err)
}
Expand Down