Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Watchman integration #188

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ buildGoModule rec {

nativeBuildInputs = [ installShellFiles ];

vendorHash = "sha256-c53Af0X/TUFsxpV4YE525uh+yLG/t2tSmQkr9ElziBM=";
#vendorHash = lib.fakeHash;
vendorHash = "sha256-BLZ8WB4pVj9yon0eUbc3LBKb08kN7KWok6wE2esbyGw=";
# vendorHash = lib.fakeHash;

postInstall = ''
mv $out/bin/{src,process-compose}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ replace github.com/cakturk/go-netstat => github.com/f1bonacc1/netstat v0.0.0-202
require (
github.com/InVisionApp/go-logger v1.0.1 // indirect
github.com/KyleBanks/depth v1.2.1 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/bytedance/sonic v1.11.6 // indirect
github.com/bytedance/sonic/loader v0.1.1 // indirect
github.com/cdmistman/watchman v0.2.1-0.20240521013409-16b1ae353832 // indirect
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this won't be pinned to a revision by the time i mark this pr as ready. once i finish this integration and clean up the library's api and do some more tests i'll cut a 0.3 release

github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/InVisionApp/go-logger v1.0.1 h1:WFL19PViM1mHUmUWfsv5zMo379KSWj2MRmBlz
github.com/InVisionApp/go-logger v1.0.1/go.mod h1:+cGTDSn+P8105aZkeOfIhdd7vFO5X1afUHcjvanY0L8=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/adrg/xdg v0.4.0 h1:RzRqFcjH4nE5C6oTAxhBtoE2IRyjBSa62SCbyPidvls=
github.com/adrg/xdg v0.4.0/go.mod h1:N6ag73EX4wyxeaoeHctc1mas01KZgsj5tYiAIwqJE/E=
Expand All @@ -17,6 +19,8 @@ github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc
github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4=
github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM=
github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
github.com/cdmistman/watchman v0.2.1-0.20240521013409-16b1ae353832 h1:Ax/3QlycSW6UV2Gcl1cWB0xfvHej85JhZYPJuQBD1gs=
github.com/cdmistman/watchman v0.2.1-0.20240521013409-16b1ae353832/go.mod h1:dWMrnZS+cSIn+wGnWneps2bPUWvwvFLKq2/iS4yJOa4=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
Expand Down
58 changes: 49 additions & 9 deletions src/app/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"context"
"errors"
"fmt"
"github.com/cakturk/go-netstat/netstat"
"github.com/f1bonacc1/process-compose/src/types"
"io"
"math/rand"
"os"
Expand All @@ -17,6 +15,9 @@ import (
"syscall"
"time"

"github.com/cakturk/go-netstat/netstat"
"github.com/f1bonacc1/process-compose/src/types"

"github.com/f1bonacc1/process-compose/src/command"
"github.com/f1bonacc1/process-compose/src/health"
"github.com/f1bonacc1/process-compose/src/pclog"
Expand Down Expand Up @@ -65,6 +66,13 @@ type Process struct {
isMain bool
extraArgs []string
isStopped atomic.Bool
isDevRestart atomic.Bool
devWatchers []devWatch
}

type devWatch struct {
sub *WatchmanSub
config *types.Watch
}

func NewProcess(
Expand All @@ -77,6 +85,7 @@ func NewProcess(
printLogs bool,
isMain bool,
extraArgs []string,
watchman *Watchman,
) *Process {
colNumeric := rand.Intn(int(color.FgHiWhite)-int(color.FgHiBlack)) + int(color.FgHiBlack)

Expand Down Expand Up @@ -104,6 +113,7 @@ func NewProcess(
proc.setUpProbes()
proc.procCond = *sync.NewCond(proc)
proc.procStartedCond = *sync.NewCond(proc)
proc.setUpWatchman(watchman)
return proc
}

Expand Down Expand Up @@ -139,9 +149,9 @@ func (p *Process) run() int {

p.startProbes()

//Wait should wait for I/O consumption, but if the execution is too fast
//e.g. echo 'hello world' the output will not reach the pipe
//TODO Fix this
// Wait should wait for I/O consumption, but if the execution is too fast
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry i have gofumpt enabled with gopls 😅

// e.g. echo 'hello world' the output will not reach the pipe
// TODO Fix this
time.Sleep(50 * time.Millisecond)
_ = p.command.Wait()
p.Lock()
Expand Down Expand Up @@ -212,7 +222,6 @@ func (p *Process) getCommander() command.Commander {
p.mergeExtraArgs(),
)
}

}

func (p *Process) mergeExtraArgs() []string {
Expand Down Expand Up @@ -254,6 +263,11 @@ func (p *Process) isRestartable() bool {
p.Lock()
exitCode := p.getExitCode()
p.Unlock()

if p.isDevRestart.Swap(false) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i needed a new signal for when the restart is associated with a watch. i thought about changing isStopped to some stopKind enum ie:

type stopKind u8

const (
  notStopped stopKind = iota
  isStopped
  isDevRestart
)

this is probably still the better solution (especially since p.isStopped.Swap(false) is missing in this if)

return true
}

if p.isStopped.Swap(false) {
return false
}
Expand Down Expand Up @@ -381,16 +395,35 @@ func (p *Process) isRunning() bool {

func (p *Process) prepareForShutDown() {
// prevent restart during global shutdown or scale down
//p.procConf.RestartPolicy.Restart = types.RestartPolicyNo
// p.procConf.RestartPolicy.Restart = types.RestartPolicyNo
p.isStopped.Store(true)

}

func (p *Process) onProcessStart() {
if isStringDefined(p.procConf.LogLocation) {
p.logger.Open(p.getLogPath(), p.procConf.LoggerConfig)
}

for _, watch := range p.devWatchers {
go func() {
for {
files, ok := watch.sub.Recv()
if !ok {
log.
Debug().
Msg("watchman sub closed, exiting")
return
}

if len(files) == 0 {
continue
}

p.isDevRestart.Store(true)
}
}()
}

p.Lock()
p.started = true
p.Unlock()
Expand Down Expand Up @@ -457,8 +490,8 @@ func (p *Process) updateProcState() {
p.procState.Name = p.getName()
}
p.procState.IsRunning = isRunning

}

func (p *Process) setStartTime(startTime time.Time) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i should use this value to reset the watchman subscription clock

p.timeMutex.Lock()
defer p.timeMutex.Unlock()
Expand Down Expand Up @@ -663,6 +696,13 @@ func (p *Process) onReadinessCheckEnd(isOk, isFatal bool, err string) {
}
}

func (p *Process) setUpWatchman(watchman *Watchman) {
for _, config := range p.procConf.Watch {
recv := watchman.Subscribe(config)
p.devWatchers = append(p.devWatchers, devWatch{recv, config})
}
}

func (p *Process) validateProcess() error {
if isStringDefined(p.procConf.WorkingDir) {
stat, err := os.Stat(p.procConf.WorkingDir)
Expand Down
15 changes: 8 additions & 7 deletions src/app/project_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ package app

import (
"fmt"
"github.com/f1bonacc1/process-compose/src/config"
"github.com/f1bonacc1/process-compose/src/pclog"
"github.com/f1bonacc1/process-compose/src/types"
"os"
"os/user"
"runtime"
"slices"
"sync"
"time"

"github.com/f1bonacc1/process-compose/src/config"
"github.com/f1bonacc1/process-compose/src/pclog"
"github.com/f1bonacc1/process-compose/src/types"

"github.com/rs/zerolog/log"
)

Expand All @@ -32,6 +33,7 @@ type ProjectRunner struct {
mainProcessArgs []string
isTuiOn bool
isOrderedShutDown bool
watchman Watchman
}

func (p *ProjectRunner) GetLexicographicProcessNames() ([]string, error) {
Expand Down Expand Up @@ -67,7 +69,7 @@ func (p *ProjectRunner) Run() int {
p.logger.Open(p.project.LogLocation, p.project.LoggerConfig)
defer p.logger.Close()
}
//zerolog.SetGlobalLevel(zerolog.PanicLevel)
// zerolog.SetGlobalLevel(zerolog.PanicLevel)
log.Debug().Msgf("Spinning up %d processes. Order: %q", len(runOrder), nameOrder)
for _, proc := range runOrder {
newConf := proc
Expand Down Expand Up @@ -108,6 +110,7 @@ func (p *ProjectRunner) runProcess(config *types.ProcessConfig) {
printLogs,
isMain,
extraArgs,
&p.watchman,
)
p.addRunningProcess(process)
p.waitGroup.Add(1)
Expand All @@ -128,7 +131,6 @@ func (p *ProjectRunner) runProcess(config *types.ProcessConfig) {
func (p *ProjectRunner) waitIfNeeded(process *types.ProcessConfig) error {
for k := range process.DependsOn {
if runningProc := p.getRunningProcess(k); runningProc != nil {

switch process.DependsOn[k].Condition {
case types.ProcessConditionCompleted:
runningProc.waitForCompletion()
Expand Down Expand Up @@ -158,7 +160,6 @@ func (p *ProjectRunner) waitIfNeeded(process *types.ProcessConfig) error {
} else {
log.Error().Msgf("Error: process %s depends on %s, but it isn't running", process.ReplicaName, k)
}

}
return nil
}
Expand Down Expand Up @@ -596,6 +597,7 @@ func (p *ProjectRunner) renameProcess(name string, newName string) {
p.project.Processes[newName] = config
}
}

func (p *ProjectRunner) removeProcessLogs(name string) *pclog.ProcessLogBuffer {
p.logsMutex.Lock()
defer p.logsMutex.Unlock()
Expand Down Expand Up @@ -724,7 +726,6 @@ func bToMb(b uint64) uint64 {
}

func NewProjectRunner(opts *ProjectOpts) (*ProjectRunner, error) {

hostname, err := os.Hostname()
if err != nil {
log.Err(err).Msg("Failed get hostname")
Expand Down
Loading
Loading