Skip to content

Commit

Permalink
feat: mutiplex test success
Browse files Browse the repository at this point in the history
  • Loading branch information
int7 committed Oct 14, 2023
1 parent db4cfea commit 2d463cc
Show file tree
Hide file tree
Showing 12 changed files with 68 additions and 120 deletions.
14 changes: 0 additions & 14 deletions client/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ import (
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/transport"
"github.com/fatedier/frp/pkg/util/log"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/xlog"
)

Expand Down Expand Up @@ -110,8 +108,6 @@ func NewControl(
authSetter: authSetter,
}

ctl.xl.Info("get pxy cfgs: %v", util.JsonDump(ctl.pxyCfgs))

ctl.msgTransporter = transport.NewMessageTransporter(ctl.sendCh)
ctl.pm = proxy.NewManager(ctl.ctx, clientCfg, ctl.msgTransporter)

Expand Down Expand Up @@ -142,32 +138,24 @@ func (ctl *Control) HandleReqWorkConn(_ *msg.ReqWorkConn) {
RunID: ctl.runID,
}

log.Info("client m: %#v", m)

if err = ctl.authSetter.SetNewWorkConn(m); err != nil {
xl.Warn("error during NewWorkConn authentication: %v", err)
return
}

log.Info("client m: %#v", m)

if err = msg.WriteMsg(workConn, m); err != nil {
xl.Warn("work connection write to server error: %v", err)
workConn.Close()
return
}

log.Info("write msg success")

var startMsg msg.StartWorkConn
if err = msg.ReadMsgInto(workConn, &startMsg); err != nil {
xl.Trace("work connection closed before response StartWorkConn message: %v", err)
workConn.Close()
return
}

log.Info("get start msg: %v", util.JsonDump(startMsg))

if startMsg.Error != "" {
xl.Error("StartWorkConn contains error: %s", startMsg.Error)
workConn.Close()
Expand All @@ -181,8 +169,6 @@ func (ctl *Control) HandleReqWorkConn(_ *msg.ReqWorkConn) {
func (ctl *Control) HandleNewProxyResp(inMsg *msg.NewProxyResp) {
xl := ctl.xl

xl.Info("proxy: %v, remote addr: %v, err: %v", inMsg.ProxyName, inMsg.RemoteAddr, inMsg.Error)

// Server will return NewProxyResp message to each NewProxy message.
// Start a new proxy handler if no error got
err := ctl.pm.StartProxy(inMsg.ProxyName, inMsg.RemoteAddr, inMsg.Error)
Expand Down
5 changes: 0 additions & 5 deletions client/proxy/proxy_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/transport"
"github.com/fatedier/frp/pkg/util/log"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/xlog"
)

Expand Down Expand Up @@ -98,7 +96,6 @@ func (pm *Manager) HandleEvent(payload interface{}) error {
switch e := payload.(type) {
case *event.StartProxyPayload:
m = e.NewProxyMsg
log.Info("proxy manager get a start proxy event: %v", util.JsonDump(m))
case *event.CloseProxyPayload:
m = e.CloseProxyMsg
default:
Expand Down Expand Up @@ -152,8 +149,6 @@ func (pm *Manager) Reload(pxyCfgs []v1.ProxyConfigurer) {
pm.proxies[name] = pxy
addPxyNames = append(addPxyNames, name)

xl.Info("proxy: %v call Start() function, cfg: %v", name, util.JsonDump(cfg))

pxy.Start()
}
}
Expand Down
3 changes: 0 additions & 3 deletions client/proxy/proxy_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/transport"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/xlog"
)

Expand Down Expand Up @@ -201,8 +200,6 @@ func (pw *Wrapper) checkWorker() {
pw.Cfg.MarshalToMsg(&newProxyMsg)
pw.lastSendStartMsg = now

xl.Info("proxy: %v generate a start proxy payload event to frps: %v", pw.Name, util.JsonDump(newProxyMsg))

_ = pw.handler(&event.StartProxyPayload{
NewProxyMsg: &newProxyMsg,
})
Expand Down
1 change: 0 additions & 1 deletion client/proxy/vproxy_manager.go

This file was deleted.

2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,3 @@ require (

// TODO(fatedier): Temporary use the modified version, update to the official version after merging into the official repository.
replace github.com/hashicorp/yamux => github.com/fatedier/yamux v0.0.0-20230628132301-7aca4898904d

// replace golang.org/x/crypto v0.11.0 => /Users/bytedance/private-project/go-dev/crypto
4 changes: 1 addition & 3 deletions pkg/config/legacy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ type ServerCommonConf struct {
BindAddr string `ini:"bind_addr" json:"bind_addr"`
// BindPort specifies the port that the server listens on. By default, this
// value is 7000.
BindPort int `ini:"bind_port" json:"bind_port" validate:"gte=0,lte=65535"`

SSHBindPort int `ini:"ssh_bind_port" json:"ssh_bind_port" validate:"gte=0,lte=65535"`
BindPort int `ini:"bind_port" json:"bind_port"`

// KCPBindPort specifies the KCP port that the server listens on. If this
// value is 0, the server will not listen for KCP connections. By default,
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/v1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type ServerConfig struct {
BindAddr string `json:"bindAddr,omitempty"`
// BindPort specifies the port that the server listens on. By default, this
// value is 7000.
BindPort int `json:"bindPort,omitempty" validate:"gte=0,lte=65535"`
BindPort int `json:"bindPort,omitempty"`

SSHBindPort int `json:"sshBindPort,omitempty" validate:"gte=0,lte=65535"`
SSHPrivateKeyFilePath string `json:"sshPrivateKeyFilePath,omitempty"`
Expand Down
2 changes: 1 addition & 1 deletion server/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (pxy *BaseProxy) handleUserTCPConnection(userConn net.Conn) {

var workConn net.Conn
// try all connections from the pool
if pxy.GetLoginMsg().User == "ssh-frpc" {
if pxy.GetLoginMsg().User == "_frpc_ssh-client_" {
workConn, err = pxy.getWorkConnFn()
} else {
workConn, err = pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr())
Expand Down
8 changes: 0 additions & 8 deletions server/proxy/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,6 @@ func (pxy *TCPProxy) Run() (remoteAddr string, err error) {
pxy.listeners = append(pxy.listeners, l)
xl.Info("tcp proxy listen port [%d] in group [%s]", pxy.cfg.RemotePort, pxy.cfg.LoadBalancer.Group)
} else {

if pxy.rc == nil {
panic("rc is nil")
}
if pxy.rc.TCPPortManager == nil {
panic("tcp port manager is nil")
}

pxy.realBindPort, err = pxy.rc.TCPPortManager.Acquire(pxy.name, pxy.cfg.RemotePort)
if err != nil {
return
Expand Down
31 changes: 9 additions & 22 deletions server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ type Service struct {
sshListener net.Listener
sshConfig *ssh.ServerConfig

// sshConnCh chan frp_net.WrapSSHConn

// sshCtl *SSHControl

// Accept connections using kcp
kcpListener net.Listener

Expand Down Expand Up @@ -212,7 +208,8 @@ func NewService(cfg *v1.ServerConfig) (svr *Service, err error) {
if cfg.SSHBindPort > 0 {
svr.sshConfig = &ssh.ServerConfig{
PublicKeyCallback: func(conn ssh.ConnMetadata, key ssh.PublicKey) (*ssh.Permissions, error) {
authorizedKey, err := os.ReadFile(fmt.Sprintf("%v/%v.pub", cfg.SSHPublicKeyFilesPath, ssh.FingerprintSHA256(key)))
file := fmt.Sprintf("%v/%v.pub", cfg.SSHPublicKeyFilesPath, ssh.FingerprintSHA256(key))
authorizedKey, err := os.ReadFile(file)
if err != nil {
return nil, err
}
Expand All @@ -223,14 +220,11 @@ func NewService(cfg *v1.ServerConfig) (svr *Service, err error) {
}

if key.Type() == parsedAuthorizedKey.Type() && bytes.Equal(key.Marshal(), parsedAuthorizedKey.Marshal()) {
log.Info("ssh fingerprint sha256: %v", ssh.FingerprintSHA256(key))

// TODO 存储用户的登录信息到内存里,这样不用每次去读文件了,提升性能
log.Info("ssh file: %v fingerprint sha256: %v", file, ssh.FingerprintSHA256(key))

return &ssh.Permissions{
// You can store metadata on the Permissions struct.
Extensions: map[string]string{
"pubkey fingerprint": ssh.FingerprintSHA256(key),
ssh.FingerprintSHA256(key): string(authorizedKey),
},
}, nil
}
Expand All @@ -240,7 +234,7 @@ func NewService(cfg *v1.ServerConfig) (svr *Service, err error) {

privateBytes, err := os.ReadFile(cfg.SSHPrivateKeyFilePath)
if err != nil {
log.Error("Failed to load private key (./id_rsa)")
log.Error("Failed to load private key")
return nil, err
}

Expand All @@ -253,15 +247,12 @@ func NewService(cfg *v1.ServerConfig) (svr *Service, err error) {
svr.sshConfig.AddHostKey(private)

sshAddr := net.JoinHostPort(cfg.BindAddr, strconv.Itoa(cfg.SSHBindPort))
listener, err := net.Listen("tcp", sshAddr)
svr.sshListener, err = net.Listen("tcp", sshAddr)
if err != nil {
log.Error("Failed to listen on %v, error: %v", sshAddr, err)
return nil, err
}

log.Info("ssh server listening on %v", sshAddr)

svr.sshListener = listener
}

// Listen for accepting connections from client using kcp protocol.
Expand Down Expand Up @@ -448,13 +439,6 @@ func (svr *Service) Close() error {
return nil
}

// 处理 SSH 控制流+数据流的信息
func (svr *Service) handleSSHConnection(ctx context.Context, conn net.Conn) {
log.Info("set conn to ssh channel")
// svr.sshConnCh <- conn.(frp_net.WrapSSHConn)
log.Info("send chan done")
}

func (svr *Service) handleConnection(ctx context.Context, conn net.Conn) {
xl := xlog.FromContextSafe(ctx)

Expand Down Expand Up @@ -602,6 +586,9 @@ func (svr *Service) HandleSSHListener(listener net.Listener) {
ctx,
v1.ClientCommonConfig{},
*svr.cfg,
msg.Login{
User: SSHClientLoginUserPrefix + tcpConn.RemoteAddr().String(),
},
svr.rc,
pxyCfg,
ss,
Expand Down
Loading

0 comments on commit 2d463cc

Please sign in to comment.