Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
int7 committed Oct 12, 2023
1 parent 0a0d25e commit ea97129
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 58 deletions.
30 changes: 20 additions & 10 deletions server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,26 +587,36 @@ func (svr *Service) HandleSSHListener(listener net.Listener) {

ss, err := NewSSHService(tcpConn, svr.sshConfig, pxyPayloadCh, replyCh)
if err != nil {
panic(err)
log.Error("new ssh service error: %v", err)
continue
}
ss.Run()

fn := func(v interface{}) {
replyCh <- v
}

go func() {
pxyCfg := <-pxyPayloadCh

vs, err := NewVirtualService(pxyCfg, ss, fn)
ctx := context.Background()

vs, err := NewVirtualService(
ctx,
v1.ClientCommonConfig{},
*svr.cfg,
svr.rc,
pxyCfg,
ss,
replyCh,
)
if err != nil {
panic(err)
log.Error("new virtual service error: %v", err)
ss.Close()
return
}
vs.InitResource(v1.ClientCommonConfig{}, *svr.cfg, svr.rc)

err = vs.Run(context.Background())
err = vs.Run(ctx)
if err != nil {
panic(err)
log.Error("proxy run error: %v", err)
vs.Close()
return
}
}()
}
Expand Down
54 changes: 27 additions & 27 deletions server/ssh_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ const (
RequestTypeHeartbeat = "[email protected]"
)

// 当 proxy 失败会返回该错误
type VProxyError struct {
}

// parse ssh client cmds input
type forwardedTCPPayload struct {
Addr string
Expand Down Expand Up @@ -54,7 +58,7 @@ type SSHExtraPayload struct {
type SSHService struct {
tcpConn net.Conn
cfg *ssh.ServerConfig

sshConn *ssh.ServerConn
gChannel <-chan ssh.NewChannel
gReq <-chan *ssh.Request
Expand Down Expand Up @@ -119,27 +123,25 @@ func (ss *SSHService) wait() {
ss.Close()
}

func (ss *SSHService) Exit() <-chan struct{} {
return ss.closeCh
}

func (ss *SSHService) Close() {
if atomic.LoadInt32(&ss.exit) == 1 {
return
}

select {
case _, ok := <-ss.closeCh:
if ok {
close(ss.closeCh)
}
case _, ok := <-ss.addrPayloadCh:
if ok {
close(ss.addrPayloadCh)
}
case _, ok := <-ss.extraPayloadCh:
if ok {
close(ss.extraPayloadCh)
}
case <-ss.closeCh:
return
default:
}

close(ss.closeCh)
close(ss.addrPayloadCh)
close(ss.extraPayloadCh)

ss.sshConn.Close()
ss.tcpConn.Close()

Expand All @@ -153,10 +155,11 @@ func (ss *SSHService) loopParseCmdPayload() {
select {
case req, ok := <-ss.gReq:
if !ok {
log.Warn("global request is close")
log.Info("global request is close")
ss.Close()
return
}

switch req.Type {
case RequestTypeForward:
var addrPayload SSHCmdPayload
Expand All @@ -166,10 +169,11 @@ func (ss *SSHService) loopParseCmdPayload() {
}
ss.addrPayloadCh <- addrPayload

// TODO
err := req.Reply(true, nil)
if err != nil {
log.Error("reply to ssh client error: %v", err)
if req.WantReply {
err := req.Reply(true, nil)
if err != nil {
log.Error("reply to ssh client error: %v", err)
}
}
default:
if req.Type == RequestTypeHeartbeat {
Expand Down Expand Up @@ -201,7 +205,7 @@ func (ss *SSHService) loopSendHeartbeat(ch ssh.Channel) {
}
continue
}
log.Info("heartbeat send success, ok: %v", ok)
log.Debug("heartbeat send success, ok: %v", ok)
case <-ss.closeCh:
return
}
Expand Down Expand Up @@ -252,9 +256,6 @@ func (ss *SSHService) loopParseExtraPayload() {
return
}

type VProxyError struct {
}

func (ss *SSHService) SSHConn() *ssh.ServerConn {
return ss.sshConn
}
Expand All @@ -275,7 +276,6 @@ func (ss *SSHService) loopReply() {
log.Error("run frp proxy error, close ssh service")
ss.Close()
}

}
}
}
Expand Down Expand Up @@ -421,10 +421,6 @@ func ParseHTTPCommand(params []string) (*HTTPCommand, error) {
return httpCmd, nil
}

type nullWriter struct{}

func (w *nullWriter) Write(p []byte) (n int, err error) { return len(p), nil }

type TCPCommand struct {
Address string
Port string
Expand Down Expand Up @@ -467,3 +463,7 @@ func ParseTCPCommand(params []string) (*TCPCommand, error) {
}
return tcpCmd, nil
}

type nullWriter struct{}

func (w *nullWriter) Write(p []byte) (n int, err error) { return len(p), nil }
49 changes: 28 additions & 21 deletions server/vclient_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,50 +41,48 @@ type VirtualService struct {
// call cancel to stop SSHService
cancel context.CancelFunc

callbackFn func(v interface{})
pxy proxy.Proxy
replyCh chan interface{}
pxy proxy.Proxy
}

func NewVirtualService(
ctx context.Context,
clientCfg v1.ClientCommonConfig,
serverCfg v1.ServerConfig,
rc *controller.ResourceController,
pxyCfg v1.ProxyConfigurer,
sshSvc *SSHService,
callbackFn func(v interface{}),
replyCh chan interface{},
) (svr *VirtualService, err error) {
svr = &VirtualService{
pxyCfg: pxyCfg,
cfg: clientCfg,
serverCfg: serverCfg,
rc: rc,

loginMsg: &msg.Login{
User: "ssh-frpc",
},

sshSvc: sshSvc,
pxyCfg: pxyCfg,

ctx: context.Background(),
ctx: ctx,
exit: 0,

callbackFn: callbackFn,
replyCh: replyCh,
}

svr.runID, err = util.RandID()
if err != nil {
return nil, err
}
return
}

func (svr *VirtualService) InitResource(clientCfg v1.ClientCommonConfig, serverCfg v1.ServerConfig, rc *controller.ResourceController) {
svr.cfg = clientCfg
svr.serverCfg = serverCfg
svr.rc = rc
go svr.loopCheck()

return
}

func (svr *VirtualService) Run(ctx context.Context) (err error) {
defer func() {
if err != nil {
svr.callbackFn(&VProxyError{})
}
}()

ctx, cancel := context.WithCancel(ctx)
svr.ctx = xlog.NewContext(ctx, xlog.New())
svr.cancel = cancel
Expand All @@ -111,14 +109,20 @@ func (svr *VirtualService) Close() {

func (svr *VirtualService) GracefulClose(d time.Duration) {
atomic.StoreUint32(&svr.exit, 1)

svr.callbackFn(&VProxyError{})

svr.pxy.Close()

if svr.cancel != nil {
svr.cancel()
}

svr.replyCh <- &VProxyError{}
}

func (svr *VirtualService) loopCheck() {
<-svr.sshSvc.Exit()

log.Info("virtual client service close")
svr.pxy.Close()
}

func (svr *VirtualService) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err error) {
Expand Down Expand Up @@ -154,10 +158,13 @@ func (svr *VirtualService) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr strin

remoteAddr, err = pxy.Run()
if err != nil {
log.Warn("proxy run error: %v", err)
return
}

defer func() {
if err != nil {
log.Warn("proxy close")
pxy.Close()
}
}()
Expand Down

0 comments on commit ea97129

Please sign in to comment.