Skip to content

Commit

Permalink
XHTTP client: Refactor "packet-up" mode, chasing "stream-up" (#4150)
Browse files Browse the repository at this point in the history
* Add wroteRequest (waiting for new quic-go)

* Use XTLS/quic-go instead

* Client doesn't need `scMaxConcurrentPosts` anymore

* GotConn is available in H3

* `scMaxConcurrentPosts` -> `scMaxBufferedPosts` (server only, 30 by default)

Fixes #4100
  • Loading branch information
RPRX authored Dec 11, 2024
1 parent 6be3c35 commit 8cd9a74
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 242 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/xtls/xray-core

go 1.21.4

replace github.com/quic-go/quic-go v0.46.0 => github.com/xtls/quic-go v0.46.2

require (
github.com/OmarTariq612/goech v0.0.0-20240405204721-8e2e1dafd3a0
github.com/cloudflare/circl v1.4.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo=
github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A=
github.com/quic-go/quic-go v0.46.0 h1:uuwLClEEyk1DNvchH8uCByQVjo3yKL9opKulExNDs7Y=
github.com/quic-go/quic-go v0.46.0/go.mod h1:1dLehS7TIR64+vxGR70GDcatWTOtMX2PUtnKsjbTurI=
github.com/refraction-networking/utls v1.6.7 h1:zVJ7sP1dJx/WtVuITug3qYUq034cDq9B2MR1K67ULZM=
github.com/refraction-networking/utls v1.6.7/go.mod h1:BC3O4vQzye5hqpmDTWUqi4P5DDhzJfkV1tdqtawQIH0=
github.com/riobard/go-bloom v0.0.0-20200614022211-cdc8013cb5b3 h1:f/FNXud6gA3MNr8meMVVGxhp+QBTqY91tM8HjEuMjGg=
Expand All @@ -70,6 +68,8 @@ github.com/vishvananda/netlink v1.3.0 h1:X7l42GfcV4S6E4vHTsw48qbrV+9PVojNfIhZcwQ
github.com/vishvananda/netlink v1.3.0/go.mod h1:i6NetklAujEcC6fK0JPjT8qSwWyO0HLn4UKG+hGqeJs=
github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8=
github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM=
github.com/xtls/quic-go v0.46.2 h1:bzUnZIQIH8SyqYGR6fAXzEvZauA+32J/2w2AtnEFa1o=
github.com/xtls/quic-go v0.46.2/go.mod h1:1dLehS7TIR64+vxGR70GDcatWTOtMX2PUtnKsjbTurI=
github.com/xtls/reality v0.0.0-20240712055506-48f0b2d5ed6d h1:+B97uD9uHLgAAulhigmys4BVwZZypzK7gPN3WtpgRJg=
github.com/xtls/reality v0.0.0-20240712055506-48f0b2d5ed6d/go.mod h1:dm4y/1QwzjGaK17ofi0Vs6NpKAHegZky8qk6J2JJZAE=
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
Expand Down
99 changes: 45 additions & 54 deletions infra/conf/transport_internet.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,35 +221,28 @@ func (c *HttpUpgradeConfig) Build() (proto.Message, error) {
type SplitHTTPConfig struct {
Host string `json:"host"`
Path string `json:"path"`
Mode string `json:"mode"`
Headers map[string]string `json:"headers"`
ScMaxConcurrentPosts *Int32Range `json:"scMaxConcurrentPosts"`
ScMaxEachPostBytes *Int32Range `json:"scMaxEachPostBytes"`
ScMinPostsIntervalMs *Int32Range `json:"scMinPostsIntervalMs"`
XPaddingBytes Int32Range `json:"xPaddingBytes"`
NoGRPCHeader bool `json:"noGRPCHeader"`
NoSSEHeader bool `json:"noSSEHeader"`
XPaddingBytes *Int32Range `json:"xPaddingBytes"`
ScMaxEachPostBytes Int32Range `json:"scMaxEachPostBytes"`
ScMinPostsIntervalMs Int32Range `json:"scMinPostsIntervalMs"`
ScMaxBufferedPosts int64 `json:"scMaxConcurrentPosts"`
KeepAlivePeriod int64 `json:"keepAlivePeriod"`
Xmux Xmux `json:"xmux"`
DownloadSettings *StreamConfig `json:"downloadSettings"`
Mode string `json:"mode"`
Extra json.RawMessage `json:"extra"`
NoGRPCHeader bool `json:"noGRPCHeader"`
KeepAlivePeriod int64 `json:"keepAlivePeriod"`
}

type Xmux struct {
MaxConcurrency *Int32Range `json:"maxConcurrency"`
MaxConnections *Int32Range `json:"maxConnections"`
CMaxReuseTimes *Int32Range `json:"cMaxReuseTimes"`
CMaxLifetimeMs *Int32Range `json:"cMaxLifetimeMs"`
MaxConcurrency Int32Range `json:"maxConcurrency"`
MaxConnections Int32Range `json:"maxConnections"`
CMaxReuseTimes Int32Range `json:"cMaxReuseTimes"`
CMaxLifetimeMs Int32Range `json:"cMaxLifetimeMs"`
}

func splithttpNewRandRangeConfig(input *Int32Range) *splithttp.RandRangeConfig {
if input == nil {
return &splithttp.RandRangeConfig{
From: 0,
To: 0,
}
}

func splithttpNewRandRangeConfig(input Int32Range) *splithttp.RandRangeConfig {
return &splithttp.RandRangeConfig{
From: input.From,
To: input.To,
Expand All @@ -270,69 +263,67 @@ func (c *SplitHTTPConfig) Build() (proto.Message, error) {
c = &extra
}

switch c.Mode {
case "":
c.Mode = "auto"
case "auto", "packet-up", "stream-up", "stream-one":
default:
return nil, errors.New("unsupported mode: " + c.Mode)
}

// Priority (client): host > serverName > address
for k := range c.Headers {
if strings.ToLower(k) == "host" {
return nil, errors.New(`"headers" can't contain "host"`)
}
}

if c.Xmux.MaxConnections != nil && c.Xmux.MaxConnections.To > 0 && c.Xmux.MaxConcurrency != nil && c.Xmux.MaxConcurrency.To > 0 {
if c.Xmux.MaxConnections.To > 0 && c.Xmux.MaxConcurrency.To > 0 {
return nil, errors.New("maxConnections cannot be specified together with maxConcurrency")
}

// Multiplexing config
muxProtobuf := splithttp.Multiplexing{
MaxConcurrency: splithttpNewRandRangeConfig(c.Xmux.MaxConcurrency),
MaxConnections: splithttpNewRandRangeConfig(c.Xmux.MaxConnections),
CMaxReuseTimes: splithttpNewRandRangeConfig(c.Xmux.CMaxReuseTimes),
CMaxLifetimeMs: splithttpNewRandRangeConfig(c.Xmux.CMaxLifetimeMs),
}

if muxProtobuf.MaxConcurrency.To == 0 &&
muxProtobuf.MaxConnections.To == 0 &&
muxProtobuf.CMaxReuseTimes.To == 0 &&
muxProtobuf.CMaxLifetimeMs.To == 0 {
muxProtobuf.MaxConcurrency.From = 16
muxProtobuf.MaxConcurrency.To = 32
muxProtobuf.CMaxReuseTimes.From = 64
muxProtobuf.CMaxReuseTimes.To = 128
}

switch c.Mode {
case "":
c.Mode = "auto"
case "auto", "packet-up", "stream-up", "stream-one":
default:
return nil, errors.New("unsupported mode: " + c.Mode)
if c.Xmux.MaxConcurrency.To == 0 &&
c.Xmux.MaxConnections.To == 0 &&
c.Xmux.CMaxReuseTimes.To == 0 &&
c.Xmux.CMaxLifetimeMs.To == 0 {
c.Xmux.MaxConcurrency.From = 16
c.Xmux.MaxConcurrency.To = 32
c.Xmux.CMaxReuseTimes.From = 64
c.Xmux.CMaxReuseTimes.To = 128
}

config := &splithttp.Config{
Path: c.Path,
Host: c.Host,
Header: c.Headers,
ScMaxConcurrentPosts: splithttpNewRandRangeConfig(c.ScMaxConcurrentPosts),
ScMaxEachPostBytes: splithttpNewRandRangeConfig(c.ScMaxEachPostBytes),
ScMinPostsIntervalMs: splithttpNewRandRangeConfig(c.ScMinPostsIntervalMs),
NoSSEHeader: c.NoSSEHeader,
XPaddingBytes: splithttpNewRandRangeConfig(c.XPaddingBytes),
Xmux: &muxProtobuf,
Path: c.Path,
Mode: c.Mode,
Headers: c.Headers,
XPaddingBytes: splithttpNewRandRangeConfig(c.XPaddingBytes),
NoGRPCHeader: c.NoGRPCHeader,
NoSSEHeader: c.NoSSEHeader,
ScMaxEachPostBytes: splithttpNewRandRangeConfig(c.ScMaxEachPostBytes),
ScMinPostsIntervalMs: splithttpNewRandRangeConfig(c.ScMinPostsIntervalMs),
ScMaxBufferedPosts: c.ScMaxBufferedPosts,
KeepAlivePeriod: c.KeepAlivePeriod,
Xmux: &splithttp.Multiplexing{
MaxConcurrency: splithttpNewRandRangeConfig(c.Xmux.MaxConcurrency),
MaxConnections: splithttpNewRandRangeConfig(c.Xmux.MaxConnections),
CMaxReuseTimes: splithttpNewRandRangeConfig(c.Xmux.CMaxReuseTimes),
CMaxLifetimeMs: splithttpNewRandRangeConfig(c.Xmux.CMaxLifetimeMs),
},
}
var err error

if c.DownloadSettings != nil {
if c.Mode == "stream-one" {
return nil, errors.New(`Can not use "downloadSettings" in "stream-one" mode.`)
}
if c.Extra != nil {
c.DownloadSettings.SocketSettings = nil
}
var err error
if config.DownloadSettings, err = c.DownloadSettings.Build(); err != nil {
return nil, errors.New(`Failed to build "downloadSettings".`).Base(err)
}
}

return config, nil
}

Expand Down
11 changes: 2 additions & 9 deletions transport/internet/splithttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,7 @@ func (c *DefaultDialerClient) OpenDownload(ctx context.Context, baseURL string)
gotDownResponse.Close()
}()

if !c.isH3 {
// in quic-go, sometimes gotConn is never closed for the lifetime of
// the entire connection, and the download locks up
// https://github.com/quic-go/quic-go/issues/3342
// for other HTTP versions, we want to block Dial until we know the
// remote address of the server, for logging purposes
<-gotConn.Wait()
}
<-gotConn.Wait()

lazyDownload := &LazyReader{
CreateReader: func() (io.Reader, error) {
Expand All @@ -172,7 +165,7 @@ func (c *DefaultDialerClient) OpenDownload(ctx context.Context, baseURL string)
}

func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string, payload io.ReadWriteCloser, contentLength int64) error {
req, err := http.NewRequest("POST", url, payload)
req, err := http.NewRequestWithContext(ctx, "POST", url, payload)
if err != nil {
return err
}
Expand Down
13 changes: 5 additions & 8 deletions transport/internet/splithttp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *Config) GetNormalizedQuery() string {

func (c *Config) GetRequestHeader() http.Header {
header := http.Header{}
for k, v := range c.Header {
for k, v := range c.Headers {
header.Add(k, v)
}

Expand All @@ -64,15 +64,12 @@ func (c *Config) WriteResponseHeader(writer http.ResponseWriter) {
}
}

func (c *Config) GetNormalizedScMaxConcurrentPosts() RandRangeConfig {
if c.ScMaxConcurrentPosts == nil || c.ScMaxConcurrentPosts.To == 0 {
return RandRangeConfig{
From: 100,
To: 100,
}
func (c *Config) GetNormalizedScMaxBufferedPosts() int {
if c.ScMaxBufferedPosts == 0 {
return 30
}

return *c.ScMaxConcurrentPosts
return int(c.ScMaxBufferedPosts)
}

func (c *Config) GetNormalizedScMaxEachPostBytes() RandRangeConfig {
Expand Down
Loading

0 comments on commit 8cd9a74

Please sign in to comment.