Skip to content

Commit

Permalink
core: ToS for control plane (major)
Browse files Browse the repository at this point in the history
* client side, via intra-cluster client
* separately, server (listening) side
  - except HTTPS
* add feature flag: `Do-not-Set-Control-Plane-ToS`
* separately:
  - add feature flag: `Trust-Crypto-Safe-Checksums`

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Dec 22, 2024
1 parent c2ca3c9 commit 8185be3
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 78 deletions.
56 changes: 29 additions & 27 deletions ais/htcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ type (
muxers httpMuxers
sndRcvBufSize int
sync.Mutex
lowLatencyToS bool
}

nlogWriter struct{}
Expand Down Expand Up @@ -493,50 +494,44 @@ func (*nlogWriter) Write(p []byte) (int, error) {
// Override muxer ServeHTTP to support proxying HTTPS requests. Clients
// initiate all HTTPS requests with CONNECT method instead of GET/PUT etc.
func (server *netServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// plain
if r.Method != http.MethodConnect {
server.muxers.ServeHTTP(w, r)
return
}

// TODO: add support for caching HTTPS requests
destConn, err := net.DialTimeout("tcp", r.Host, 10*time.Second)
// HTTPS
destConn, err := net.DialTimeout("tcp", r.Host, cmn.DfltDialupTimeout)
if err != nil {
cmn.WriteErr(w, r, err, http.StatusServiceUnavailable)
return
}

// Second, hijack the connection. A kind of man-in-the-middle attack
// From this point on, this function is responsible for HTTP connection
// hijack the connection
hijacker, ok := w.(http.Hijacker)
if !ok {
cmn.WriteErr(w, r, errors.New("response writer does not support hijacking"),
http.StatusInternalServerError)
cmn.WriteErr(w, r, errors.New("response writer does not support hijacking"), http.StatusInternalServerError)
return
}

// First, send that everything is OK. Trying to write a header after
// hijacking generates a warning and nothing works
w.WriteHeader(http.StatusOK)

clientConn, _, err := hijacker.Hijack()
if err != nil {
// NOTE: cannot send error because we have already written a header.
nlog.Errorln(err)
return
}

// Third, start transparently sending data between source and destination
// by creating a tunnel between them
transfer := func(destination io.WriteCloser, source io.ReadCloser) {
io.Copy(destination, source)
source.Close()
destination.Close()
}
// send/receive both ways (bi-directional tunnel)
// (one of those close() calls will fail, the one that loses the race)
go _copy(destConn, clientConn)
go _copy(clientConn, destConn)
}

// NOTE: it looks like double closing both connections.
// Need to check how the tunnel works
go transfer(destConn, clientConn)
go transfer(clientConn, destConn)
func _copy(destination io.WriteCloser, source io.ReadCloser) {
io.Copy(destination, source)
source.Close()
destination.Close()
}

func (server *netServer) listen(addr string, logger *log.Logger, tlsConf *tls.Config, config *cmn.Config) (err error) {
Expand All @@ -555,9 +550,12 @@ func (server *netServer) listen(addr string, logger *log.Logger, tlsConf *tls.Co
if timeout, isSet := cmn.ParseReadHeaderTimeout(); isSet { // optional env var
server.s.ReadHeaderTimeout = timeout
}
if server.sndRcvBufSize > 0 && !config.Net.HTTP.UseHTTPS {
server.s.ConnState = server.connStateListener // setsockopt; see also cmn.NewTransport

// set sock options on the server side; NOTE https exclusion
if (server.sndRcvBufSize > 0 || server.lowLatencyToS) && !config.Net.HTTP.UseHTTPS {
server.s.ConnState = server.connStateListener
}

server.s.TLSConfig = tlsConf
server.Unlock()
retry:
Expand Down Expand Up @@ -611,10 +609,14 @@ func (server *netServer) connStateListener(c net.Conn, cs http.ConnState) {
return
}
tcpconn, ok := c.(*net.TCPConn)
cos.Assert(ok)
rawconn, _ := tcpconn.SyscallConn()
args := cmn.TransportArgs{SndRcvBufSize: server.sndRcvBufSize}
rawconn.Control(args.ConnControl(rawconn))
debug.Assert(ok)
rawconn, err := tcpconn.SyscallConn()
if err != nil {
nlog.Errorln("FATAL tcpconn.SyscallConn err:", err) // (unlikely)
return
}
args := cmn.TransportArgs{SndRcvBufSize: server.sndRcvBufSize, LowLatencyToS: server.lowLatencyToS}
args.ServerControl(rawconn)
}

func (server *netServer) shutdown(config *cmn.Config) {
Expand All @@ -625,7 +627,7 @@ func (server *netServer) shutdown(config *cmn.Config) {
}
ctx, cancel := context.WithTimeout(context.Background(), config.Timeout.MaxHostBusy.D())
if err := server.s.Shutdown(ctx); err != nil {
nlog.Infoln("http server shutdown err:", err)
nlog.Warningln("http server shutdown err:", err)
}
cancel()
}
Expand Down
4 changes: 2 additions & 2 deletions ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (h *htrun) init(config *cmn.Config) {
if h.si.IsProxy() {
tcpbuf = 0
} else if tcpbuf == 0 {
tcpbuf = cmn.DefaultSendRecvBufferSize // ditto: targets use AIS default when not configured
tcpbuf = cmn.DefaultSndRcvBufferSize // ditto: targets use AIS default when not configured
}

// PubNet enable tracing when configuration is set.
Expand All @@ -290,7 +290,7 @@ func (h *htrun) init(config *cmn.Config) {
// TODO: for now tracing is always disabled for intra-cluster traffic.
// Allow enabling through config.
muxers = newMuxers(false /*enableTracing*/)
g.netServ.control = &netServer{muxers: muxers, sndRcvBufSize: 0}
g.netServ.control = &netServer{muxers: muxers, sndRcvBufSize: 0, lowLatencyToS: true}
}
g.netServ.data = g.netServ.control // if not configured, intra-data net is intra-control
if config.HostNet.UseIntraData {
Expand Down
1 change: 1 addition & 0 deletions ais/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func initCtrlClient(config *cmn.Config) {
IdleConnTimeout: config.Net.HTTP.IdleConnTimeout.D(),
IdleConnsPerHost: config.Net.HTTP.MaxIdleConnsPerHost,
MaxIdleConns: config.Net.HTTP.MaxIdleConns,
LowLatencyToS: true,
}
if config.Net.HTTP.UseHTTPS {
g.client.control = cmn.NewIntraClientTLS(cargs, config)
Expand Down
37 changes: 30 additions & 7 deletions cmn/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,29 @@ import (
"github.com/NVIDIA/aistore/cmn/cos"
)

const DialupTimeoutDftl = 30 * time.Second
const (
DfltDialupTimeout = 10 * time.Second
DfltKeepaliveTCP = 30 * time.Second
)

// [NOTE]
// net/http.DefaultTransport has the following defaults:
//
// - MaxIdleConns: 100,
// - MaxIdleConnsPerHost : 2 (via DefaultMaxIdleConnsPerHost)
// - IdleConnTimeout: 90 * time.Second,
// - WriteBufferSize: 4KB
// - ReadBufferSize: 4KB
//
// Following are the defaults we use instead:
const (
DefaultMaxIdleConns = 0 // unlimited (in re: `http.errTooManyIdle`)
DefaultMaxIdleConnsPerHost = 32 // (http.errTooManyIdleHost)
DefaultIdleConnTimeout = 6 * time.Second // (Go default is 90s)
DefaultWriteBufferSize = 64 * cos.KiB
DefaultReadBufferSize = 64 * cos.KiB
DefaultSndRcvBufferSize = 128 * cos.KiB
)

type (
// assorted http(s) client options
Expand All @@ -33,6 +55,7 @@ type (
WriteBufferSize int
ReadBufferSize int
UseHTTPProxyEnv bool
LowLatencyToS bool
}
TLSArgs struct {
ClientCA string
Expand All @@ -47,17 +70,17 @@ type (
func NewTransport(cargs TransportArgs) *http.Transport {
var (
defaultTransport = http.DefaultTransport.(*http.Transport)
dialTimeout = cos.NonZero(cargs.DialTimeout, DialupTimeoutDftl)
dialTimeout = cos.NonZero(cargs.DialTimeout, DfltDialupTimeout)
)

dialer := &net.Dialer{
Timeout: dialTimeout,
KeepAlive: 30 * time.Second,
}
// setsockopt when non-zero, otherwise use TCP defaults
if cargs.SndRcvBufSize > 0 {
dialer.Control = cargs.setSockOpt
KeepAlive: DfltKeepaliveTCP,
}

// NOTE: setsockopt when (SndRcvBufSize > 0 and/or LowLatencyToS)
dialer.Control = cargs.clientControl()

transport := &http.Transport{
DialContext: dialer.DialContext,
TLSHandshakeTimeout: defaultTransport.TLSHandshakeTimeout,
Expand Down
2 changes: 2 additions & 0 deletions cmn/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1739,6 +1739,8 @@ const (
EcStreamsEver = -time.Second
EcStreamsDflt = 10 * time.Minute
EcStreamsMini = 5 * time.Minute

// and a few more hardcoded below
)

func (c *TimeoutConf) Validate() error {
Expand Down
14 changes: 10 additions & 4 deletions cmn/feat/feat.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ const (
StreamingColdGET // write and transmit cold-GET content back to user in parallel, without _finalizing_ in-cluster object
S3ReverseProxy // intra-cluster communications: instead of regular HTTP redirects reverse-proxy S3 API calls to designated targets
S3UsePathStyle // use older path-style addressing (as opposed to virtual-hosted style), e.g., https://s3.amazonaws.com/BUCKET/KEY
DontDeleteWhenRebalancing // when objects get rebalanced to their proper destinations, keep the sources - do not delete
DontDeleteWhenRebalancing // when objects get _rebalanced_ to their proper locations, do not delete their respective _misplaced_ sources
DontSetControlPlaneToS // intra-cluster control plane: do not set IPv4 ToS field (to low-latency)
TrustCryptoSafeChecksums // when checking whether objects are identical trust only cryptographically secure checksums
)

var Cluster = [...]string{
Expand All @@ -49,15 +51,18 @@ var Cluster = [...]string{
"Fsync-PUT",
"LZ4-Block-1MB",
"LZ4-Frame-Checksum",
"Dont-Allow-Passing-FQN-to-ETL",
"Do-not-Allow-Passing-FQN-to-ETL",
"Ignore-LimitedCoexistence-Conflicts",
"S3-Presigned-Request",
"Dont-Optimize-Listing-Virtual-Dirs",
"Do-not-Optimize-Listing-Virtual-Dirs",
"Disable-Cold-GET",
"Streaming-Cold-GET",
"S3-Reverse-Proxy",
"S3-Use-Path-Style", // https://aws.amazon.com/blogs/aws/amazon-s3-path-deprecation-plan-the-rest-of-the-story
"Dont-Delete-When-Rebalancing",
"Do-not-Delete-When-Rebalancing",
"Do-not-Set-Control-Plane-ToS",
"Trust-Crypto-Safe-Checksums",

// "none" ====================
}

Expand All @@ -68,6 +73,7 @@ var Bucket = [...]string{
"Disable-Cold-GET",
"Streaming-Cold-GET",
"S3-Use-Path-Style", // https://aws.amazon.com/blogs/aws/amazon-s3-path-deprecation-plan-the-rest-of-the-story

// "none" ====================
}

Expand Down
19 changes: 0 additions & 19 deletions cmn/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ import (
"fmt"
"net"
"strconv"
"time"

"github.com/NVIDIA/aistore/cmn/cos"
)

const (
Expand All @@ -20,22 +17,6 @@ const (
NetIntraData = "INTRA-DATA"
)

// http.DefaultTransport has the following defaults:
// - MaxIdleConns: 100,
// - MaxIdleConnsPerHost : 2 (via DefaultMaxIdleConnsPerHost)
// - IdleConnTimeout: 90 * time.Second,
// - WriteBufferSize: 4KB
// - ReadBufferSize: 4KB
// Following are the constants we use by default:
const (
DefaultMaxIdleConns = 0 // unlimited (in re: `http.errTooManyIdle`)
DefaultMaxIdleConnsPerHost = 32 // (http.errTooManyIdleHost)
DefaultIdleConnTimeout = 6 * time.Second // Go default is 90s
DefaultWriteBufferSize = 64 * cos.KiB
DefaultReadBufferSize = 64 * cos.KiB
DefaultSendRecvBufferSize = 128 * cos.KiB
)

var KnownNetworks = [...]string{NetPublic, NetIntraControl, NetIntraData}

func NetworkIsKnown(net string) bool {
Expand Down
90 changes: 77 additions & 13 deletions cmn/network_unix.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,92 @@
// Package cmn provides common constants, types, and utilities for AIS clients
// and AIStore.
/*
* Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
*/
package cmn

import (
"syscall"

"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/feat"
"github.com/NVIDIA/aistore/cmn/nlog"
)

func (args *TransportArgs) setSockOpt(_, _ string, c syscall.RawConn) (err error) {
return c.Control(args.ConnControl(c))
// ref: https://linuxreviews.org/Type_of_Service_(ToS)_and_DSCP_Values
const (
lowDelayToS = 0x10
)

type (
cntlFunc func(network, address string, c syscall.RawConn) error
)

func (args *TransportArgs) ServerControl(c syscall.RawConn) {
switch {
case args.SndRcvBufSize > 0 && args.LowLatencyToS:
c.Control(args._sndrcvtos)
case args.SndRcvBufSize > 0:
c.Control(args._sndrcv)
case args.LowLatencyToS && !Rom.Features().IsSet(feat.DontSetControlPlaneToS):
c.Control(args._tos)
}
}

func (args *TransportArgs) clientControl() cntlFunc {
switch {
case args.SndRcvBufSize > 0 && args.LowLatencyToS:
return args.setSockSndRcvToS
case args.SndRcvBufSize > 0:
return args.setSockSndRcv
case args.LowLatencyToS && !Rom.Features().IsSet(feat.DontSetControlPlaneToS):
return args.setSockToS
}
return nil
}

//
//--------------------------- low level internals
//

func (args *TransportArgs) setSockSndRcv(_, _ string, c syscall.RawConn) (err error) {
return c.Control(args._sndrcv)
}

// buffering is limited by /proc/sys/net/core/rmem_max and /proc/sys/net/core/wmem_max, respectively
func (args *TransportArgs) _sndrcv(fd uintptr) {
err := syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_RCVBUF, args.SndRcvBufSize)
_croak(err)

err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_SNDBUF, args.SndRcvBufSize)
_croak(err)
}

func (args *TransportArgs) setSockToS(_, _ string, c syscall.RawConn) (err error) {
return c.Control(args._tos)
}

func (*TransportArgs) _tos(fd uintptr) {
err := syscall.SetsockoptInt(int(fd), syscall.IPPROTO_IP, syscall.IP_TOS, lowDelayToS)
_croak(err)
}

func (args *TransportArgs) setSockSndRcvToS(_, _ string, c syscall.RawConn) (err error) {
return c.Control(args._sndrcvtos)
}

func (args *TransportArgs) _sndrcvtos(fd uintptr) {
err := syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_RCVBUF, args.SndRcvBufSize)
_croak(err)

err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_SNDBUF, args.SndRcvBufSize)
_croak(err)

err = syscall.SetsockoptInt(int(fd), syscall.IPPROTO_IP, syscall.IP_TOS, lowDelayToS)
_croak(err)
}

func (args *TransportArgs) ConnControl(_ syscall.RawConn) (cntl func(fd uintptr)) {
cntl = func(fd uintptr) {
// NOTE: is limited by /proc/sys/net/core/rmem_max
err := syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_RCVBUF, args.SndRcvBufSize)
debug.AssertNoErr(err)
// NOTE: is limited by /proc/sys/net/core/wmem_max
err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_SNDBUF, args.SndRcvBufSize)
debug.AssertNoErr(err)
func _croak(err error) {
if err != nil {
nlog.ErrorDepth(1, err)
}
return
}
Loading

0 comments on commit 8185be3

Please sign in to comment.