Skip to content

Commit

Permalink
[io] Fix _NativeSocket.shutdownRead
Browse files Browse the repository at this point in the history
When shutting down receive direction we need to ensure that
`closedRead` event is dispatched - as the receive direction
is only considered closed if the available data is drained
and `closedRead` is dispatched.

The code did not account for a possibility that socket has
no available data and as such is not dispatching read events
so closing receive direction and then separately closing
send direction would leave the socket in a state where
both directions are closed but the socket is not disposed
because `closedRead` is not dispatched - such socket objects
will simply leak (even if the other side terminates
the connection).

This CL also updates documentation around `RawSocket.shutdown`
and `RawSocket.readEventsEnabled` to make it clear that
users are responsible for draining accumulated data if
they want to shutdown receive direction.

Fixes #27414

TEST=standalone/io/issue_27414

CoreLibraryReviewExempt: Documentation only changes in VM specific library
Change-Id: I4b0ffb4cc67836c2849ec6e49b788a4f3b4c07d3
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/396340
Reviewed-by: Brian Quinlan <[email protected]>
Commit-Queue: Slava Egorov <[email protected]>
  • Loading branch information
mraleph authored and Commit Queue committed Nov 27, 2024
1 parent c872db1 commit e2c8c23
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 6 deletions.
13 changes: 12 additions & 1 deletion sdk/lib/_internal/vm/bin/socket_patch.dart
Original file line number Diff line number Diff line change
Expand Up @@ -1623,21 +1623,29 @@ base class _NativeSocket extends _NativeSocketNativeWrapper
void issue() {
readEventIssued = false;
if (isClosing) return;
// Note: it is by design that we don't deliver closedRead event
// unless read events are enabled. This also means we will not
// fully close (and dispose) of the socket unless it is drained
// of accumulated incomming data.
if (!sendReadEvents) return;
if (stopRead()) {
if (isClosedRead && !closedReadEventSent) {
if (isClosedWrite) close();

var handler = closedEventHandler;
if (handler == null) return;

closedReadEventSent = true;
handler();
}
return;
}

var handler = readEventHandler;
if (handler == null) return;
readEventIssued = true;
handler();

readEventIssued = true;
scheduleMicrotask(issue);
}

Expand Down Expand Up @@ -1846,6 +1854,9 @@ base class _NativeSocket extends _NativeSocketNativeWrapper
sendToEventHandler(1 << shutdownReadCommand);
}
isClosedRead = true;
// Make sure to dispatch a closedRead event. Shutdown is only complete
// once the socket is drained of data and readClosed is dispatched.
issueReadEvent();
}
}

Expand Down
22 changes: 19 additions & 3 deletions sdk/lib/io/socket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,12 @@ final class ConnectionTask<S> {
/// ([RawSocketEvent.closed]).
abstract interface class RawSocket implements Stream<RawSocketEvent> {
/// Set or get, if the [RawSocket] should listen for [RawSocketEvent.read]
/// events. Default is `true`.
/// and [RawSocketEvent.readClosed] events. Default is `true`.
///
/// Warning: setting [readEventsEnabled] to `false` might prevent socket
/// from fully closing when [SocketDirection.receive] and
/// [SocketDirection.send] directions are shutdown independently. See
/// [shutdown] for more details.
abstract bool readEventsEnabled;

/// Set or get, if the [RawSocket] should listen for [RawSocketEvent.write]
Expand All @@ -636,8 +641,8 @@ abstract interface class RawSocket implements Stream<RawSocketEvent> {
/// The [host] can either be a [String] or an [InternetAddress]. If [host] is a
/// [String], [connect] will perform a [InternetAddress.lookup] and try
/// all returned [InternetAddress]es, until connected. If IPv4 and IPv6
/// addresses are both availble then connections over IPv4 are preferred. If
/// no connection can be establed then the error from the first failing
/// addresses are both available then connections over IPv4 are preferred. If
/// no connection can be established then the error from the first failing
/// connection is returned.
///
/// The argument [sourceAddress] can be used to specify the local
Expand Down Expand Up @@ -788,6 +793,17 @@ abstract interface class RawSocket implements Stream<RawSocketEvent> {
/// and calling it several times is supported. Calling
/// shutdown with either [SocketDirection.both] or [SocketDirection.receive]
/// can result in a [RawSocketEvent.readClosed] event.
///
/// Warning: [SocketDirection.receive] direction is only considered to be
/// to be fully shutdown once all available data is drained and
/// [RawSocketEvent.readClosed] is dispatched. Shutting down
/// [SocketDirection.receive] and [SocketDirection.send] directions separately
/// without draining the data will lead to socket staying around until the
/// data is drained. This can happen if [readEventsEnabled] is set
/// to `false` or if received data is not [read] in response to these
/// events. This does not apply to shutting down both directions
/// simultaneously using [SocketDirection.both] which will discard all
/// received data instead.
void shutdown(SocketDirection direction);

/// Customize the [RawSocket].
Expand Down
8 changes: 6 additions & 2 deletions tests/standalone/io/issue_22636_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@

import "dart:async";
import "dart:io";

import "package:expect/expect.dart";
import "package:expect/async_helper.dart";

final Duration delay = new Duration(milliseconds: 100);
final List<int> data = new List.generate(100, (i) => i % 20 + 65);
Expand All @@ -28,10 +30,11 @@ void serverListen(RawSocket serverSide) {
serverSide.writeEventsEnabled = true;
});
} else {
new Future.delayed(delay, () {
new Future.delayed(delay, () async {
Expect.isTrue(serverReadClosedReceived);
serverSide.shutdown(SocketDirection.send);
server.close();
await server.close();
asyncEnd();
});
}
break;
Expand All @@ -58,5 +61,6 @@ Future test() async {
}

void main() {
asyncStart();
test();
}
74 changes: 74 additions & 0 deletions tests/standalone/io/issue_27414_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
//
// This test verifies that shuting down receive and send directions separately
// on a socket correctly shuts the socket down instead of leaking it.

import 'dart:async';
import 'dart:convert';
import 'dart:io';

import 'package:expect/expect.dart';
import 'package:expect/async_helper.dart';

const messageContent = "hello, from the client!";
late RawServerSocket server;
late StreamSubscription clientSubscription;

void handleConnection(RawSocket serverSide) {
var readClosedReceived = false;

void serveData(RawSocketEvent event) async {
switch (event) {
case RawSocketEvent.read:
final data = serverSide.read();
Expect.equals(messageContent, utf8.decode(data!));

// There might be a read event in flight, wait for microtasks to drain
// and then shutdown read and write directions separately. This
// should cause [readClosed] to be dispatched.
Future.delayed(Duration(milliseconds: 0), () {
serverSide.shutdown(SocketDirection.receive);
serverSide.shutdown(SocketDirection.send);
});
break;

case RawSocketEvent.readClosed:
Expect.isFalse(readClosedReceived);
readClosedReceived = true;
break;

case RawSocketEvent.closed:
Expect.isTrue(readClosedReceived);
await clientSubscription.cancel();
await server.close();
asyncEnd();
break;
}
}

serverSide.listen(serveData);
}

Future test() async {
server = await RawServerSocket.bind(InternetAddress.loopbackIPv4, 0);
server.listen(handleConnection);

final client = await RawSocket.connect(
InternetAddress.loopbackIPv4,
server.port,
);
clientSubscription = client.listen((RawSocketEvent event) {
switch (event) {
case RawSocketEvent.write:
client.write(utf8.encode(messageContent));
break;
}
});
}

void main() {
asyncStart();
test();
}

0 comments on commit e2c8c23

Please sign in to comment.