Skip to content

Commit

Permalink
XHTTP: Update logging
Browse files Browse the repository at this point in the history
  • Loading branch information
RPRX authored Dec 12, 2024
1 parent 743435d commit 5722488
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 36 deletions.
67 changes: 41 additions & 26 deletions transport/internet/splithttp/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,23 +83,34 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in
return res.Resource.(DialerClient), res
}

func h(tlsConfig *tls.Config, realityConfig *reality.Config) int {
if realityConfig != nil {
return 2
}
if tlsConfig == nil {
return 1
}
if len(tlsConfig.NextProtocol) != 1 {
return 2
}
if tlsConfig.NextProtocol[0] == "http/1.1" {
return 1
}
if tlsConfig.NextProtocol[0] == "h3" {
return 3
}
return 2
}

func createHTTPClient(dest net.Destination, streamSettings *internet.MemoryStreamConfig) DialerClient {
tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
realityConfig := reality.ConfigFromStreamSettings(streamSettings)

isH2 := false
isH3 := false

if tlsConfig != nil {
isH2 = !(len(tlsConfig.NextProtocol) == 1 && tlsConfig.NextProtocol[0] == "http/1.1")
isH3 = len(tlsConfig.NextProtocol) == 1 && tlsConfig.NextProtocol[0] == "h3"
} else if realityConfig != nil {
isH2 = true
isH3 = false
}
isH2 := h(tlsConfig, realityConfig) == 2
isH3 := h(tlsConfig, realityConfig) == 3

if isH3 {
dest.Network = net.Network_UDP
dest.Network = net.Network_UDP // better to keep this line
}

var gotlsConfig *gotls.Config
Expand Down Expand Up @@ -242,16 +253,16 @@ func init() {
}

func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (stat.Connection, error) {
errors.LogInfo(ctx, "dialing splithttp to ", dest)

var requestURL url.URL

transportConfiguration := streamSettings.ProtocolSettings.(*Config)
tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
realityConfig := reality.ConfigFromStreamSettings(streamSettings)

scMaxEachPostBytes := transportConfiguration.GetNormalizedScMaxEachPostBytes()
scMinPostsIntervalMs := transportConfiguration.GetNormalizedScMinPostsIntervalMs()
if h(tlsConfig, realityConfig) == 3 {
dest.Network = net.Network_UDP
}
errors.LogInfo(ctx, "XHTTP is dialing to: ", dest)

transportConfiguration := streamSettings.ProtocolSettings.(*Config)
var requestURL url.URL

if tlsConfig != nil || realityConfig != nil {
requestURL.Scheme = "https"
Expand All @@ -275,8 +286,8 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me

httpClient, muxRes := getHTTPClient(ctx, dest, streamSettings)

httpClient2 := httpClient
requestURL2 := requestURL
httpClient2 := httpClient
var muxRes2 *muxResource
if transportConfiguration.DownloadSettings != nil {
globalDialerAccess.Lock()
Expand All @@ -286,9 +297,12 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
globalDialerAccess.Unlock()
memory2 := streamSettings.DownloadSettings
dest2 := *memory2.Destination // just panic
httpClient2, muxRes2 = getHTTPClient(ctx, dest2, memory2)
tlsConfig2 := tls.ConfigFromStreamSettings(memory2)
realityConfig2 := reality.ConfigFromStreamSettings(memory2)
if h(tlsConfig2, realityConfig2) == 3 {
dest2.Network = net.Network_UDP
}
errors.LogInfo(ctx, "XHTTP is downloading from: ", dest2)
if tlsConfig2 != nil || realityConfig2 != nil {
requestURL2.Scheme = "https"
} else {
Expand All @@ -307,19 +321,20 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
}
requestURL2.Path = config2.GetNormalizedPath() + sessionIdUuid.String()
requestURL2.RawQuery = config2.GetNormalizedQuery()
httpClient2, muxRes2 = getHTTPClient(ctx, dest2, memory2)
}

mode := transportConfiguration.Mode
if mode == "" || mode == "auto" {
mode = "packet-up"
if (tlsConfig != nil && (len(tlsConfig.NextProtocol) != 1 || tlsConfig.NextProtocol[0] == "h2")) || realityConfig != nil {
if h(tlsConfig, realityConfig) == 2 {
mode = "stream-up"
}
if realityConfig != nil && transportConfiguration.DownloadSettings == nil {
mode = "stream-one"
}
}
errors.LogInfo(ctx, "XHTTP is using mode: "+mode)
errors.LogInfo(ctx, "XHTTP is using mode: ", mode)

var writer io.WriteCloser
var reader io.ReadCloser
Expand Down Expand Up @@ -373,6 +388,9 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
return stat.Connection(&conn), nil
}

scMaxEachPostBytes := transportConfiguration.GetNormalizedScMaxEachPostBytes()
scMinPostsIntervalMs := transportConfiguration.GetNormalizedScMinPostsIntervalMs()

maxUploadSize := scMaxEachPostBytes.roll()
// WithSizeLimit(0) will still allow single bytes to pass, and a lot of
// code relies on this behavior. Subtract 1 so that together with
Expand Down Expand Up @@ -408,10 +426,7 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
seq += 1

if scMinPostsIntervalMs.From > 0 {
sleep := time.Duration(scMinPostsIntervalMs.roll())*time.Millisecond - time.Since(lastWrite)
if sleep > 0 {
time.Sleep(sleep)
}
time.Sleep(time.Duration(scMinPostsIntervalMs.roll())*time.Millisecond - time.Since(lastWrite))
}

// by offloading the uploads into a buffered pipe, multiple conn.Write
Expand Down
18 changes: 9 additions & 9 deletions transport/internet/splithttp/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,30 +333,30 @@ func ListenSH(ctx context.Context, address net.Address, port net.Port, streamSet
Net: "unix",
}, streamSettings.SocketSettings)
if err != nil {
return nil, errors.New("failed to listen unix domain socket(for SH) on ", address).Base(err)
return nil, errors.New("failed to listen UNIX domain socket for XHTTP on ", address).Base(err)
}
errors.LogInfo(ctx, "listening unix domain socket(for SH) on ", address)
errors.LogInfo(ctx, "listening UNIX domain socket for XHTTP on ", address)
} else if l.isH3 { // quic
Conn, err := internet.ListenSystemPacket(context.Background(), &net.UDPAddr{
IP: address.IP(),
Port: int(port),
}, streamSettings.SocketSettings)
if err != nil {
return nil, errors.New("failed to listen UDP(for SH3) on ", address, ":", port).Base(err)
return nil, errors.New("failed to listen UDP for XHTTP/3 on ", address, ":", port).Base(err)
}
h3listener, err := quic.ListenEarly(Conn, tlsConfig, nil)
if err != nil {
return nil, errors.New("failed to listen QUIC(for SH3) on ", address, ":", port).Base(err)
return nil, errors.New("failed to listen QUIC for XHTTP/3 on ", address, ":", port).Base(err)
}
l.h3listener = h3listener
errors.LogInfo(ctx, "listening QUIC(for SH3) on ", address, ":", port)
errors.LogInfo(ctx, "listening QUIC for XHTTP/3 on ", address, ":", port)

l.h3server = &http3.Server{
Handler: handler,
}
go func() {
if err := l.h3server.ServeListener(l.h3listener); err != nil {
errors.LogWarningInner(ctx, err, "failed to serve http3 for splithttp")
errors.LogWarningInner(ctx, err, "failed to serve HTTP/3 for XHTTP/3")
}
}()
} else { // tcp
Expand All @@ -369,9 +369,9 @@ func ListenSH(ctx context.Context, address net.Address, port net.Port, streamSet
Port: int(port),
}, streamSettings.SocketSettings)
if err != nil {
return nil, errors.New("failed to listen TCP(for SH) on ", address, ":", port).Base(err)
return nil, errors.New("failed to listen TCP for XHTTP on ", address, ":", port).Base(err)
}
errors.LogInfo(ctx, "listening TCP(for SH) on ", address, ":", port)
errors.LogInfo(ctx, "listening TCP for XHTTP on ", address, ":", port)
}

// tcp/unix (h1/h2)
Expand All @@ -397,7 +397,7 @@ func ListenSH(ctx context.Context, address net.Address, port net.Port, streamSet

go func() {
if err := l.server.Serve(l.listener); err != nil {
errors.LogWarningInner(ctx, err, "failed to serve http for splithttp")
errors.LogWarningInner(ctx, err, "failed to serve HTTP for XHTTP")
}
}()
}
Expand Down
2 changes: 1 addition & 1 deletion transport/internet/splithttp/upload_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (h *uploadQueue) Push(p Packet) error {
if p.Reader != nil {
p.Reader.Close()
}
return errors.New("splithttp packet queue closed")
return errors.New("packet queue closed")
}

h.pushedPackets <- p
Expand Down

0 comments on commit 5722488

Please sign in to comment.