From af87c547cfcd904f4c0f2ffd9dcfebd1ec36ba73 Mon Sep 17 00:00:00 2001 From: Colton Donnelly Date: Sun, 19 May 2024 21:02:44 -0400 Subject: [PATCH 1/2] go get watchman --- default.nix | 4 ++-- go.mod | 2 ++ go.sum | 4 ++++ 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/default.nix b/default.nix index 2ac449e..a6330a2 100644 --- a/default.nix +++ b/default.nix @@ -18,8 +18,8 @@ buildGoModule rec { nativeBuildInputs = [ installShellFiles ]; - vendorHash = "sha256-c53Af0X/TUFsxpV4YE525uh+yLG/t2tSmQkr9ElziBM="; - #vendorHash = lib.fakeHash; + vendorHash = "sha256-BLZ8WB4pVj9yon0eUbc3LBKb08kN7KWok6wE2esbyGw="; + # vendorHash = lib.fakeHash; postInstall = '' mv $out/bin/{src,process-compose} diff --git a/go.mod b/go.mod index 377029a..b254e83 100644 --- a/go.mod +++ b/go.mod @@ -31,8 +31,10 @@ replace github.com/cakturk/go-netstat => github.com/f1bonacc1/netstat v0.0.0-202 require ( github.com/InVisionApp/go-logger v1.0.1 // indirect github.com/KyleBanks/depth v1.2.1 // indirect + github.com/Microsoft/go-winio v0.6.2 // indirect github.com/bytedance/sonic v1.11.6 // indirect github.com/bytedance/sonic/loader v0.1.1 // indirect + github.com/cdmistman/watchman v0.2.1-0.20240521013409-16b1ae353832 // indirect github.com/cloudwego/base64x v0.1.4 // indirect github.com/cloudwego/iasm v0.2.0 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect diff --git a/go.sum b/go.sum index 6f7cabd..130a95f 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/InVisionApp/go-logger v1.0.1 h1:WFL19PViM1mHUmUWfsv5zMo379KSWj2MRmBlz github.com/InVisionApp/go-logger v1.0.1/go.mod h1:+cGTDSn+P8105aZkeOfIhdd7vFO5X1afUHcjvanY0L8= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= +github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= +github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/adrg/xdg v0.4.0 h1:RzRqFcjH4nE5C6oTAxhBtoE2IRyjBSa62SCbyPidvls= github.com/adrg/xdg v0.4.0/go.mod h1:N6ag73EX4wyxeaoeHctc1mas01KZgsj5tYiAIwqJE/E= @@ -17,6 +19,8 @@ github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4= github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM= github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= +github.com/cdmistman/watchman v0.2.1-0.20240521013409-16b1ae353832 h1:Ax/3QlycSW6UV2Gcl1cWB0xfvHej85JhZYPJuQBD1gs= +github.com/cdmistman/watchman v0.2.1-0.20240521013409-16b1ae353832/go.mod h1:dWMrnZS+cSIn+wGnWneps2bPUWvwvFLKq2/iS4yJOa4= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= From cc1ee77819e9499a19fcdfc787e1c57471470a38 Mon Sep 17 00:00:00 2001 From: Colton Donnelly Date: Sun, 19 May 2024 21:05:44 -0400 Subject: [PATCH 2/2] feat: add watchman integration --- src/app/process.go | 58 ++++++++++-- src/app/project_runner.go | 15 ++-- src/app/watchman.go | 185 ++++++++++++++++++++++++++++++++++++++ src/types/process.go | 17 +++- 4 files changed, 256 insertions(+), 19 deletions(-) create mode 100644 src/app/watchman.go diff --git a/src/app/process.go b/src/app/process.go index 3f07ae2..7414b2e 100644 --- a/src/app/process.go +++ b/src/app/process.go @@ -5,8 +5,6 @@ import ( "context" "errors" "fmt" - "github.com/cakturk/go-netstat/netstat" - "github.com/f1bonacc1/process-compose/src/types" "io" "math/rand" "os" @@ -17,6 +15,9 @@ import ( "syscall" "time" + "github.com/cakturk/go-netstat/netstat" + "github.com/f1bonacc1/process-compose/src/types" + "github.com/f1bonacc1/process-compose/src/command" "github.com/f1bonacc1/process-compose/src/health" "github.com/f1bonacc1/process-compose/src/pclog" @@ -65,6 +66,13 @@ type Process struct { isMain bool extraArgs []string isStopped atomic.Bool + isDevRestart atomic.Bool + devWatchers []devWatch +} + +type devWatch struct { + sub *WatchmanSub + config *types.Watch } func NewProcess( @@ -77,6 +85,7 @@ func NewProcess( printLogs bool, isMain bool, extraArgs []string, + watchman *Watchman, ) *Process { colNumeric := rand.Intn(int(color.FgHiWhite)-int(color.FgHiBlack)) + int(color.FgHiBlack) @@ -104,6 +113,7 @@ func NewProcess( proc.setUpProbes() proc.procCond = *sync.NewCond(proc) proc.procStartedCond = *sync.NewCond(proc) + proc.setUpWatchman(watchman) return proc } @@ -139,9 +149,9 @@ func (p *Process) run() int { p.startProbes() - //Wait should wait for I/O consumption, but if the execution is too fast - //e.g. echo 'hello world' the output will not reach the pipe - //TODO Fix this + // Wait should wait for I/O consumption, but if the execution is too fast + // e.g. echo 'hello world' the output will not reach the pipe + // TODO Fix this time.Sleep(50 * time.Millisecond) _ = p.command.Wait() p.Lock() @@ -212,7 +222,6 @@ func (p *Process) getCommander() command.Commander { p.mergeExtraArgs(), ) } - } func (p *Process) mergeExtraArgs() []string { @@ -254,6 +263,11 @@ func (p *Process) isRestartable() bool { p.Lock() exitCode := p.getExitCode() p.Unlock() + + if p.isDevRestart.Swap(false) { + return true + } + if p.isStopped.Swap(false) { return false } @@ -381,9 +395,8 @@ func (p *Process) isRunning() bool { func (p *Process) prepareForShutDown() { // prevent restart during global shutdown or scale down - //p.procConf.RestartPolicy.Restart = types.RestartPolicyNo + // p.procConf.RestartPolicy.Restart = types.RestartPolicyNo p.isStopped.Store(true) - } func (p *Process) onProcessStart() { @@ -391,6 +404,26 @@ func (p *Process) onProcessStart() { p.logger.Open(p.getLogPath(), p.procConf.LoggerConfig) } + for _, watch := range p.devWatchers { + go func() { + for { + files, ok := watch.sub.Recv() + if !ok { + log. + Debug(). + Msg("watchman sub closed, exiting") + return + } + + if len(files) == 0 { + continue + } + + p.isDevRestart.Store(true) + } + }() + } + p.Lock() p.started = true p.Unlock() @@ -457,8 +490,8 @@ func (p *Process) updateProcState() { p.procState.Name = p.getName() } p.procState.IsRunning = isRunning - } + func (p *Process) setStartTime(startTime time.Time) { p.timeMutex.Lock() defer p.timeMutex.Unlock() @@ -663,6 +696,13 @@ func (p *Process) onReadinessCheckEnd(isOk, isFatal bool, err string) { } } +func (p *Process) setUpWatchman(watchman *Watchman) { + for _, config := range p.procConf.Watch { + recv := watchman.Subscribe(config) + p.devWatchers = append(p.devWatchers, devWatch{recv, config}) + } +} + func (p *Process) validateProcess() error { if isStringDefined(p.procConf.WorkingDir) { stat, err := os.Stat(p.procConf.WorkingDir) diff --git a/src/app/project_runner.go b/src/app/project_runner.go index 919bc1a..f0158fe 100644 --- a/src/app/project_runner.go +++ b/src/app/project_runner.go @@ -2,9 +2,6 @@ package app import ( "fmt" - "github.com/f1bonacc1/process-compose/src/config" - "github.com/f1bonacc1/process-compose/src/pclog" - "github.com/f1bonacc1/process-compose/src/types" "os" "os/user" "runtime" @@ -12,6 +9,10 @@ import ( "sync" "time" + "github.com/f1bonacc1/process-compose/src/config" + "github.com/f1bonacc1/process-compose/src/pclog" + "github.com/f1bonacc1/process-compose/src/types" + "github.com/rs/zerolog/log" ) @@ -32,6 +33,7 @@ type ProjectRunner struct { mainProcessArgs []string isTuiOn bool isOrderedShutDown bool + watchman Watchman } func (p *ProjectRunner) GetLexicographicProcessNames() ([]string, error) { @@ -67,7 +69,7 @@ func (p *ProjectRunner) Run() int { p.logger.Open(p.project.LogLocation, p.project.LoggerConfig) defer p.logger.Close() } - //zerolog.SetGlobalLevel(zerolog.PanicLevel) + // zerolog.SetGlobalLevel(zerolog.PanicLevel) log.Debug().Msgf("Spinning up %d processes. Order: %q", len(runOrder), nameOrder) for _, proc := range runOrder { newConf := proc @@ -108,6 +110,7 @@ func (p *ProjectRunner) runProcess(config *types.ProcessConfig) { printLogs, isMain, extraArgs, + &p.watchman, ) p.addRunningProcess(process) p.waitGroup.Add(1) @@ -128,7 +131,6 @@ func (p *ProjectRunner) runProcess(config *types.ProcessConfig) { func (p *ProjectRunner) waitIfNeeded(process *types.ProcessConfig) error { for k := range process.DependsOn { if runningProc := p.getRunningProcess(k); runningProc != nil { - switch process.DependsOn[k].Condition { case types.ProcessConditionCompleted: runningProc.waitForCompletion() @@ -158,7 +160,6 @@ func (p *ProjectRunner) waitIfNeeded(process *types.ProcessConfig) error { } else { log.Error().Msgf("Error: process %s depends on %s, but it isn't running", process.ReplicaName, k) } - } return nil } @@ -596,6 +597,7 @@ func (p *ProjectRunner) renameProcess(name string, newName string) { p.project.Processes[newName] = config } } + func (p *ProjectRunner) removeProcessLogs(name string) *pclog.ProcessLogBuffer { p.logsMutex.Lock() defer p.logsMutex.Unlock() @@ -724,7 +726,6 @@ func bToMb(b uint64) uint64 { } func NewProjectRunner(opts *ProjectOpts) (*ProjectRunner, error) { - hostname, err := os.Hostname() if err != nil { log.Err(err).Msg("Failed get hostname") diff --git a/src/app/watchman.go b/src/app/watchman.go new file mode 100644 index 0000000..3c3eb2b --- /dev/null +++ b/src/app/watchman.go @@ -0,0 +1,185 @@ +package app + +import ( + "fmt" + "os" + "path" + + "github.com/cdmistman/watchman" + "github.com/cdmistman/watchman/protocol" + "github.com/cdmistman/watchman/protocol/query" + "github.com/f1bonacc1/process-compose/src/types" + "github.com/rs/zerolog/log" +) + +type watchmanState int + +const ( + uninit watchmanState = iota + initing + errored + ready +) + +type Watchman struct { + subscriptions []*WatchmanSub +} + +// TODO: proper error handling +func (w *Watchman) MaybeStart() error { + if len(w.subscriptions) == 0 { + return nil + } + + globs := []string{} + for _, sub := range w.subscriptions { + globs = append(globs, sub.config.Glob) + } + + q := query.Query{ + Generators: query.Generators{query.GGlob: globs}, + Fields: query.Fields{query.FName}, + } + + client, err := watchman.Connect() + if err != nil { + return err + } + + if !client.HasCapability("glob_generator") { + return fmt.Errorf("watchman does not have the glob_generator capability") + } + + cwd, err := os.Getwd() + if err != nil { + return err + } + + watch, err := client.AddWatch(cwd) + if err != nil { + return err + } + + if watch.RelativePath() != "" { + if !client.HasCapability("relative_root") { + return fmt.Errorf("watchman does not have the relative_root capability but gave a relative root") + } + + q.RelativeRoot = watch.RelativePath() + } + + if _, err = watch.Subscribe("process-compose", &q); err != nil { + return err + } + + go loop(client, w.subscriptions) + return nil +} + +// assumes that all subscriptions have already been created +func loop(client *watchman.Client, subs []*WatchmanSub) { + for { + rawNotif, ok := <-client.Notifications() + if !ok { + log.Warn().Msg("watchman client shut down, exiting watch loop") + return + } + + notif, ok := rawNotif.(*protocol.Subscription) + if !ok { + log.Debug().Msg("got a non-subscription notification, ignoring") + } + + if notif.IsFreshInstance() { + log.Debug().Msg("fresh instance notification, ignoring") + } + + files := []string{} + for _, rawFile := range notif.Files() { + // if the query fields change, this will need to be a map[string]any + file := rawFile.(string) + files = append(files, file) + } + + for _, sub := range subs { + sub.updates <- files + } + } +} + +func (w *Watchman) Subscribe(config *types.Watch) *WatchmanSub { + ch := make(chan []string) + w.subscriptions = append(w.subscriptions, &WatchmanSub{ + updates: ch, + config: config, + }) + + return &WatchmanSub{ch, config} +} + +type WatchmanSub struct { + updates chan []string + config *types.Watch +} + +func (s *WatchmanSub) Recv() ([]string, bool) { + for { + files, ok := <-s.updates + if !ok { + return nil, false + } + + matches := []string{} + + Files: + for ii := 0; ii < len(files); ii++ { + file := files[ii] + + matched, err := path.Match(s.config.Glob, file) + if err != nil { + // TODO: close the channel? + log.Warn(). + Fields(map[string]any{ + "glob": s.config.Glob, + "path": file, + }). + Err(err). + Msg("invalid glob config, reporting as shut down") + + return nil, false + } else if !matched { + continue Files + } + + Ignore: + for jj := 0; jj < len(s.config.Ignore); jj++ { + matched, err := path.Match(s.config.Ignore[jj], file) + if err != nil { + log. + Warn(). + Fields(map[string]any{ + "glob": s.config.Glob, + "ignore": s.config.Ignore[jj], + "path": file, + }). + Err(err). + Msg("invalid ignore config, removing") + + s.config.Ignore = append(s.config.Ignore[:jj], s.config.Ignore[jj+1:]...) + jj -= 1 + continue Ignore + } + + if matched { + continue Files + } + } + + matches = append(matches, file) + } + + if len(matches) > 0 { + return matches, true + } + } +} diff --git a/src/types/process.go b/src/types/process.go index a2af2d6..3d28c46 100644 --- a/src/types/process.go +++ b/src/types/process.go @@ -2,16 +2,26 @@ package types import ( "fmt" - "github.com/f1bonacc1/process-compose/src/health" "math" "time" + + "github.com/f1bonacc1/process-compose/src/health" ) -const DefaultNamespace = "default" -const PlaceHolderValue = "-" +const ( + DefaultNamespace = "default" + PlaceHolderValue = "-" +) type Processes map[string]ProcessConfig + type Environment []string + +type Watch struct { + Glob string `yaml:"path"` + Ignore []string `yaml:"ignore"` +} + type ProcessConfig struct { Name string Disabled bool `yaml:"disabled,omitempty"` @@ -36,6 +46,7 @@ type ProcessConfig struct { Vars Vars `yaml:"vars"` IsForeground bool `yaml:"is_foreground"` IsTty bool `yaml:"is_tty"` + Watch []*Watch `yaml:"watch"` ReplicaNum int ReplicaName string Executable string