From 2d463cc4335c78a605a6050757b7d116068e5acd Mon Sep 17 00:00:00 2001 From: int7 Date: Fri, 13 Oct 2023 09:53:29 +0800 Subject: [PATCH] feat: mutiplex test success --- client/control.go | 14 ----- client/proxy/proxy_manager.go | 5 -- client/proxy/proxy_wrapper.go | 3 -- client/proxy/vproxy_manager.go | 1 - go.mod | 2 - pkg/config/legacy/server.go | 4 +- pkg/config/v1/server.go | 2 +- server/proxy/proxy.go | 2 +- server/proxy/tcp.go | 8 --- server/service.go | 31 ++++------- server/ssh_service.go | 96 +++++++++++++++++----------------- server/vclient_service.go | 20 +++---- 12 files changed, 68 insertions(+), 120 deletions(-) delete mode 100644 client/proxy/vproxy_manager.go diff --git a/client/control.go b/client/control.go index f640971f19a..5ef26c8b7b5 100644 --- a/client/control.go +++ b/client/control.go @@ -31,8 +31,6 @@ import ( v1 "github.com/fatedier/frp/pkg/config/v1" "github.com/fatedier/frp/pkg/msg" "github.com/fatedier/frp/pkg/transport" - "github.com/fatedier/frp/pkg/util/log" - "github.com/fatedier/frp/pkg/util/util" "github.com/fatedier/frp/pkg/util/xlog" ) @@ -110,8 +108,6 @@ func NewControl( authSetter: authSetter, } - ctl.xl.Info("get pxy cfgs: %v", util.JsonDump(ctl.pxyCfgs)) - ctl.msgTransporter = transport.NewMessageTransporter(ctl.sendCh) ctl.pm = proxy.NewManager(ctl.ctx, clientCfg, ctl.msgTransporter) @@ -142,23 +138,17 @@ func (ctl *Control) HandleReqWorkConn(_ *msg.ReqWorkConn) { RunID: ctl.runID, } - log.Info("client m: %#v", m) - if err = ctl.authSetter.SetNewWorkConn(m); err != nil { xl.Warn("error during NewWorkConn authentication: %v", err) return } - log.Info("client m: %#v", m) - if err = msg.WriteMsg(workConn, m); err != nil { xl.Warn("work connection write to server error: %v", err) workConn.Close() return } - log.Info("write msg success") - var startMsg msg.StartWorkConn if err = msg.ReadMsgInto(workConn, &startMsg); err != nil { xl.Trace("work connection closed before response StartWorkConn message: %v", err) @@ -166,8 +156,6 @@ func (ctl *Control) HandleReqWorkConn(_ *msg.ReqWorkConn) { return } - log.Info("get start msg: %v", util.JsonDump(startMsg)) - if startMsg.Error != "" { xl.Error("StartWorkConn contains error: %s", startMsg.Error) workConn.Close() @@ -181,8 +169,6 @@ func (ctl *Control) HandleReqWorkConn(_ *msg.ReqWorkConn) { func (ctl *Control) HandleNewProxyResp(inMsg *msg.NewProxyResp) { xl := ctl.xl - xl.Info("proxy: %v, remote addr: %v, err: %v", inMsg.ProxyName, inMsg.RemoteAddr, inMsg.Error) - // Server will return NewProxyResp message to each NewProxy message. // Start a new proxy handler if no error got err := ctl.pm.StartProxy(inMsg.ProxyName, inMsg.RemoteAddr, inMsg.Error) diff --git a/client/proxy/proxy_manager.go b/client/proxy/proxy_manager.go index 21f7440eae3..db66cb26397 100644 --- a/client/proxy/proxy_manager.go +++ b/client/proxy/proxy_manager.go @@ -27,8 +27,6 @@ import ( v1 "github.com/fatedier/frp/pkg/config/v1" "github.com/fatedier/frp/pkg/msg" "github.com/fatedier/frp/pkg/transport" - "github.com/fatedier/frp/pkg/util/log" - "github.com/fatedier/frp/pkg/util/util" "github.com/fatedier/frp/pkg/util/xlog" ) @@ -98,7 +96,6 @@ func (pm *Manager) HandleEvent(payload interface{}) error { switch e := payload.(type) { case *event.StartProxyPayload: m = e.NewProxyMsg - log.Info("proxy manager get a start proxy event: %v", util.JsonDump(m)) case *event.CloseProxyPayload: m = e.CloseProxyMsg default: @@ -152,8 +149,6 @@ func (pm *Manager) Reload(pxyCfgs []v1.ProxyConfigurer) { pm.proxies[name] = pxy addPxyNames = append(addPxyNames, name) - xl.Info("proxy: %v call Start() function, cfg: %v", name, util.JsonDump(cfg)) - pxy.Start() } } diff --git a/client/proxy/proxy_wrapper.go b/client/proxy/proxy_wrapper.go index 80e01bee959..d539a83594c 100644 --- a/client/proxy/proxy_wrapper.go +++ b/client/proxy/proxy_wrapper.go @@ -30,7 +30,6 @@ import ( v1 "github.com/fatedier/frp/pkg/config/v1" "github.com/fatedier/frp/pkg/msg" "github.com/fatedier/frp/pkg/transport" - "github.com/fatedier/frp/pkg/util/util" "github.com/fatedier/frp/pkg/util/xlog" ) @@ -201,8 +200,6 @@ func (pw *Wrapper) checkWorker() { pw.Cfg.MarshalToMsg(&newProxyMsg) pw.lastSendStartMsg = now - xl.Info("proxy: %v generate a start proxy payload event to frps: %v", pw.Name, util.JsonDump(newProxyMsg)) - _ = pw.handler(&event.StartProxyPayload{ NewProxyMsg: &newProxyMsg, }) diff --git a/client/proxy/vproxy_manager.go b/client/proxy/vproxy_manager.go deleted file mode 100644 index 943b369ffeb..00000000000 --- a/client/proxy/vproxy_manager.go +++ /dev/null @@ -1 +0,0 @@ -package proxy diff --git a/go.mod b/go.mod index bb55a98bc62..8f74dad05ac 100644 --- a/go.mod +++ b/go.mod @@ -81,5 +81,3 @@ require ( // TODO(fatedier): Temporary use the modified version, update to the official version after merging into the official repository. replace github.com/hashicorp/yamux => github.com/fatedier/yamux v0.0.0-20230628132301-7aca4898904d - -// replace golang.org/x/crypto v0.11.0 => /Users/bytedance/private-project/go-dev/crypto diff --git a/pkg/config/legacy/server.go b/pkg/config/legacy/server.go index 859a16cd463..d3cfcb2f995 100644 --- a/pkg/config/legacy/server.go +++ b/pkg/config/legacy/server.go @@ -41,9 +41,7 @@ type ServerCommonConf struct { BindAddr string `ini:"bind_addr" json:"bind_addr"` // BindPort specifies the port that the server listens on. By default, this // value is 7000. - BindPort int `ini:"bind_port" json:"bind_port" validate:"gte=0,lte=65535"` - - SSHBindPort int `ini:"ssh_bind_port" json:"ssh_bind_port" validate:"gte=0,lte=65535"` + BindPort int `ini:"bind_port" json:"bind_port"` // KCPBindPort specifies the KCP port that the server listens on. If this // value is 0, the server will not listen for KCP connections. By default, diff --git a/pkg/config/v1/server.go b/pkg/config/v1/server.go index 535bc1c1580..7d3a2deb4ee 100644 --- a/pkg/config/v1/server.go +++ b/pkg/config/v1/server.go @@ -30,7 +30,7 @@ type ServerConfig struct { BindAddr string `json:"bindAddr,omitempty"` // BindPort specifies the port that the server listens on. By default, this // value is 7000. - BindPort int `json:"bindPort,omitempty" validate:"gte=0,lte=65535"` + BindPort int `json:"bindPort,omitempty"` SSHBindPort int `json:"sshBindPort,omitempty" validate:"gte=0,lte=65535"` SSHPrivateKeyFilePath string `json:"sshPrivateKeyFilePath,omitempty"` diff --git a/server/proxy/proxy.go b/server/proxy/proxy.go index e1f676c25b4..45a7aa18670 100644 --- a/server/proxy/proxy.go +++ b/server/proxy/proxy.go @@ -231,7 +231,7 @@ func (pxy *BaseProxy) handleUserTCPConnection(userConn net.Conn) { var workConn net.Conn // try all connections from the pool - if pxy.GetLoginMsg().User == "ssh-frpc" { + if pxy.GetLoginMsg().User == "_frpc_ssh-client_" { workConn, err = pxy.getWorkConnFn() } else { workConn, err = pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr()) diff --git a/server/proxy/tcp.go b/server/proxy/tcp.go index 6c8a9ee1fc2..69cdc70bbae 100644 --- a/server/proxy/tcp.go +++ b/server/proxy/tcp.go @@ -64,14 +64,6 @@ func (pxy *TCPProxy) Run() (remoteAddr string, err error) { pxy.listeners = append(pxy.listeners, l) xl.Info("tcp proxy listen port [%d] in group [%s]", pxy.cfg.RemotePort, pxy.cfg.LoadBalancer.Group) } else { - - if pxy.rc == nil { - panic("rc is nil") - } - if pxy.rc.TCPPortManager == nil { - panic("tcp port manager is nil") - } - pxy.realBindPort, err = pxy.rc.TCPPortManager.Acquire(pxy.name, pxy.cfg.RemotePort) if err != nil { return diff --git a/server/service.go b/server/service.go index cee15edaab9..5f6fed1f2d6 100644 --- a/server/service.go +++ b/server/service.go @@ -72,10 +72,6 @@ type Service struct { sshListener net.Listener sshConfig *ssh.ServerConfig - // sshConnCh chan frp_net.WrapSSHConn - - // sshCtl *SSHControl - // Accept connections using kcp kcpListener net.Listener @@ -212,7 +208,8 @@ func NewService(cfg *v1.ServerConfig) (svr *Service, err error) { if cfg.SSHBindPort > 0 { svr.sshConfig = &ssh.ServerConfig{ PublicKeyCallback: func(conn ssh.ConnMetadata, key ssh.PublicKey) (*ssh.Permissions, error) { - authorizedKey, err := os.ReadFile(fmt.Sprintf("%v/%v.pub", cfg.SSHPublicKeyFilesPath, ssh.FingerprintSHA256(key))) + file := fmt.Sprintf("%v/%v.pub", cfg.SSHPublicKeyFilesPath, ssh.FingerprintSHA256(key)) + authorizedKey, err := os.ReadFile(file) if err != nil { return nil, err } @@ -223,14 +220,11 @@ func NewService(cfg *v1.ServerConfig) (svr *Service, err error) { } if key.Type() == parsedAuthorizedKey.Type() && bytes.Equal(key.Marshal(), parsedAuthorizedKey.Marshal()) { - log.Info("ssh fingerprint sha256: %v", ssh.FingerprintSHA256(key)) - - // TODO 存储用户的登录信息到内存里,这样不用每次去读文件了,提升性能 + log.Info("ssh file: %v fingerprint sha256: %v", file, ssh.FingerprintSHA256(key)) return &ssh.Permissions{ - // You can store metadata on the Permissions struct. Extensions: map[string]string{ - "pubkey fingerprint": ssh.FingerprintSHA256(key), + ssh.FingerprintSHA256(key): string(authorizedKey), }, }, nil } @@ -240,7 +234,7 @@ func NewService(cfg *v1.ServerConfig) (svr *Service, err error) { privateBytes, err := os.ReadFile(cfg.SSHPrivateKeyFilePath) if err != nil { - log.Error("Failed to load private key (./id_rsa)") + log.Error("Failed to load private key") return nil, err } @@ -253,15 +247,12 @@ func NewService(cfg *v1.ServerConfig) (svr *Service, err error) { svr.sshConfig.AddHostKey(private) sshAddr := net.JoinHostPort(cfg.BindAddr, strconv.Itoa(cfg.SSHBindPort)) - listener, err := net.Listen("tcp", sshAddr) + svr.sshListener, err = net.Listen("tcp", sshAddr) if err != nil { log.Error("Failed to listen on %v, error: %v", sshAddr, err) return nil, err } - log.Info("ssh server listening on %v", sshAddr) - - svr.sshListener = listener } // Listen for accepting connections from client using kcp protocol. @@ -448,13 +439,6 @@ func (svr *Service) Close() error { return nil } -// 处理 SSH 控制流+数据流的信息 -func (svr *Service) handleSSHConnection(ctx context.Context, conn net.Conn) { - log.Info("set conn to ssh channel") - // svr.sshConnCh <- conn.(frp_net.WrapSSHConn) - log.Info("send chan done") -} - func (svr *Service) handleConnection(ctx context.Context, conn net.Conn) { xl := xlog.FromContextSafe(ctx) @@ -602,6 +586,9 @@ func (svr *Service) HandleSSHListener(listener net.Listener) { ctx, v1.ClientCommonConfig{}, *svr.cfg, + msg.Login{ + User: SSHClientLoginUserPrefix + tcpConn.RemoteAddr().String(), + }, svr.rc, pxyCfg, ss, diff --git a/server/ssh_service.go b/server/ssh_service.go index 15c8b279088..538aaea038a 100644 --- a/server/ssh_service.go +++ b/server/ssh_service.go @@ -16,6 +16,7 @@ import ( v1 "github.com/fatedier/frp/pkg/config/v1" "github.com/fatedier/frp/pkg/util/log" "github.com/fatedier/frp/pkg/util/util" + gerror "github.com/fatedier/golib/errors" "golang.org/x/crypto/ssh" ) @@ -23,6 +24,7 @@ const ( ChannelTypeServerOpenChannel = "forwarded-tcpip" RequestTypeForward = "tcpip-forward" RequestTypeHeartbeat = "keepalive@openssh.com" + SSHClientLoginUserPrefix = "_frpc_ssh-client_" ) // 当 proxy 失败会返回该错误 @@ -69,9 +71,6 @@ type SSHService struct { proxyPayloadCh chan v1.ProxyConfigurer replyCh chan interface{} - // TODO 是否可以用这个来确定哪个 request 需要 reply,然后reply完成之后,从map里删除掉? - needReplyReqs map[string]*ssh.Request - mu sync.RWMutex closeCh chan struct{} @@ -116,11 +115,6 @@ func (ss *SSHService) Run() { go ss.loopReply() } -func (ss *SSHService) wait() { - ss.sshConn.Wait() - ss.Close() -} - func (ss *SSHService) Exit() <-chan struct{} { return ss.closeCh } @@ -140,6 +134,8 @@ func (ss *SSHService) Close() { close(ss.addrPayloadCh) close(ss.extraPayloadCh) + _ = ss.sshConn.Wait() + ss.sshConn.Close() ss.tcpConn.Close() @@ -165,14 +161,9 @@ func (ss *SSHService) loopParseCmdPayload() { log.Error("ssh unmarshal error: %v", err) return } - ss.addrPayloadCh <- addrPayload - - if req.WantReply { - err := req.Reply(true, nil) - if err != nil { - log.Error("reply to ssh client error: %v", err) - } - } + _ = gerror.PanicToError(func() { + ss.addrPayloadCh <- addrPayload + }) default: if req.Type == RequestTypeHeartbeat { log.Debug("ssh heartbeat data") @@ -180,6 +171,12 @@ func (ss *SSHService) loopParseCmdPayload() { log.Info("default req, data: %v", util.JsonDump(req)) } } + if req.WantReply { + err := req.Reply(true, nil) + if err != nil { + log.Error("reply to ssh client error: %v", err) + } + } case <-ss.closeCh: log.Info("loop parse cmd payload close") return @@ -222,36 +219,33 @@ func (ss *SSHService) loopParseExtraPayload() { go ss.loopSendHeartbeat(ch) - for r := range req { - if len(r.Payload) <= 4 { - log.Info("r.payload is less than 4") - continue - } - - dataLen := binary.BigEndian.Uint32(r.Payload[:4]) - p := string(r.Payload[4 : 4+dataLen]) + go func(req <-chan *ssh.Request) { + for r := range req { + if len(r.Payload) <= 4 { + log.Info("r.payload is less than 4") + continue + } - if !strings.Contains(p, "frpc") { - log.Info("ssh protocol exchange data: %v", p) - continue - } + dataLen := binary.BigEndian.Uint32(r.Payload[:4]) + p := string(r.Payload[4 : 4+dataLen]) - log.Info("get p: %v", p) + if !strings.Contains(p, "frpc") { + log.Info("payload not contains frp keyword: %v", p) + continue + } - msg, err := parseSSHExtraMessage(p) - if err != nil { - log.Error("parse ssh extra message error: %v, payload: %v", err, r.Payload) - continue + msg, err := parseSSHExtraMessage(p) + if err != nil { + log.Error("parse ssh extra message error: %v, payload: %v", err, r.Payload) + continue + } + _ = gerror.PanicToError(func() { + ss.extraPayloadCh <- msg + }) + return } - log.Info("try to send extraPayload") - ss.extraPayloadCh <- msg - log.Info("send extraPayload success") - return - } + }(req) } - - log.Error("cannot get ssh extra payload") - return } func (ss *SSHService) SSHConn() *ssh.ServerConn { @@ -322,14 +316,20 @@ func (ss *SSHService) loopGenerateProxy() { return } - // TODO 判断是什么类型的服务,然后赋值 - ss.proxyPayloadCh <- &v1.TCPProxyConfig{ - ProxyBaseConfig: v1.ProxyBaseConfig{ - Name: fmt.Sprintf("ssh-frpc-%v", time.Now().Format("2006-01-02 15:04:05")), - Type: p2.Type, - }, - RemotePort: int(p1.Port), + switch p2.Type { + case "http": + case "tcp": + ss.proxyPayloadCh <- &v1.TCPProxyConfig{ + ProxyBaseConfig: v1.ProxyBaseConfig{ + Name: fmt.Sprintf("ssh-proxy-%v-%v", ss.tcpConn.RemoteAddr().String(), time.Now().UnixNano()), + Type: p2.Type, + }, + RemotePort: int(p1.Port), + } + default: + log.Warn("invalid frp proxy type: %v", p2.Type) } + } } diff --git a/server/vclient_service.go b/server/vclient_service.go index 84e026042ad..0b1cb13575b 100644 --- a/server/vclient_service.go +++ b/server/vclient_service.go @@ -22,7 +22,7 @@ import ( // VirtualService is a client VirtualService run in frps type VirtualService struct { - cfg v1.ClientCommonConfig + clientCfg v1.ClientCommonConfig pxyCfg v1.ProxyConfigurer serverCfg v1.ServerConfig @@ -49,19 +49,18 @@ func NewVirtualService( ctx context.Context, clientCfg v1.ClientCommonConfig, serverCfg v1.ServerConfig, + logMsg msg.Login, rc *controller.ResourceController, pxyCfg v1.ProxyConfigurer, sshSvc *SSHService, replyCh chan interface{}, ) (svr *VirtualService, err error) { svr = &VirtualService{ - cfg: clientCfg, + clientCfg: clientCfg, serverCfg: serverCfg, rc: rc, - loginMsg: &msg.Login{ - User: "ssh-frpc", - }, + loginMsg: &logMsg, sshSvc: sshSvc, pxyCfg: pxyCfg, @@ -120,9 +119,8 @@ func (svr *VirtualService) GracefulClose(d time.Duration) { func (svr *VirtualService) loopCheck() { <-svr.sshSvc.Exit() - - log.Info("virtual client service close") svr.pxy.Close() + log.Info("virtual client service close") } func (svr *VirtualService) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err error) { @@ -139,7 +137,7 @@ func (svr *VirtualService) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr strin RunID: svr.runID, } - pxy, err := proxy.NewProxy(svr.ctx, &proxy.Options{ + svr.pxy, err = proxy.NewProxy(svr.ctx, &proxy.Options{ LoginMsg: svr.loginMsg, UserInfo: userInfo, Configurer: pxyConf, @@ -154,9 +152,7 @@ func (svr *VirtualService) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr strin return remoteAddr, err } - svr.pxy = pxy - - remoteAddr, err = pxy.Run() + remoteAddr, err = svr.pxy.Run() if err != nil { log.Warn("proxy run error: %v", err) return @@ -165,7 +161,7 @@ func (svr *VirtualService) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr strin defer func() { if err != nil { log.Warn("proxy close") - pxy.Close() + svr.pxy.Close() } }()