From 63df97379baec4c8f690783b26a359f1cdb1a58e Mon Sep 17 00:00:00 2001 From: Laramie Leavitt Date: Wed, 27 Nov 2024 09:07:29 -0800 Subject: [PATCH] Add TraceSpans to tensorstore TraceSpan adds scoped traces to tensorstore, however there is no current integration with any other telemetry libraries. LoggingTraceSpan incorporates TraceSpan along with scoped logging. Also inspects Executor uses and updates some TraceContext propagation. PiperOrigin-RevId: 700720206 Change-Id: I324c544b6588516e2c0de9a701d5e49cda726065 --- tensorstore/driver/BUILD | 1 + tensorstore/driver/copy.cc | 2 + tensorstore/driver/read.cc | 3 + tensorstore/driver/write.cc | 3 + tensorstore/internal/os/BUILD | 1 + tensorstore/internal/os/file_util_posix.cc | 184 +++++++++--------- tensorstore/internal/os/file_util_win.cc | 165 ++++++++-------- tensorstore/internal/thread/BUILD | 3 +- tensorstore/internal/thread/pool_impl_test.cc | 8 +- tensorstore/internal/thread/schedule_at.cc | 2 +- tensorstore/internal/thread/task.h | 8 +- tensorstore/internal/thread/thread_pool.cc | 23 ++- tensorstore/internal/tracing/BUILD | 41 +++- .../internal/tracing/logged_trace_span.cc | 45 +++++ .../internal/tracing/logged_trace_span.h | 147 ++++++++++++++ tensorstore/internal/tracing/span_attribute.h | 64 ++++++ .../tracing/{tracing.h => trace_context.h} | 8 +- tensorstore/internal/tracing/trace_span.cc | 15 ++ tensorstore/internal/tracing/trace_span.h | 57 ++++++ tensorstore/internal/tracing/trace_test.cc | 86 ++++++++ tensorstore/util/BUILD | 2 +- tensorstore/util/executor.h | 13 +- tensorstore/util/future_impl.h | 12 +- third_party/com_google_absl/workspace.bzl | 2 + 24 files changed, 704 insertions(+), 191 deletions(-) create mode 100644 tensorstore/internal/tracing/logged_trace_span.cc create mode 100644 tensorstore/internal/tracing/logged_trace_span.h create mode 100644 tensorstore/internal/tracing/span_attribute.h rename tensorstore/internal/tracing/{tracing.h => trace_context.h} (84%) create mode 100644 tensorstore/internal/tracing/trace_span.cc create mode 100644 tensorstore/internal/tracing/trace_span.h create mode 100644 tensorstore/internal/tracing/trace_test.cc diff --git a/tensorstore/driver/BUILD b/tensorstore/driver/BUILD index cb5340456..23f3dc809 100644 --- a/tensorstore/driver/BUILD +++ b/tensorstore/driver/BUILD @@ -151,6 +151,7 @@ tensorstore_cc_library( "//tensorstore/internal/json_binding", "//tensorstore/internal/json_binding:bindable", "//tensorstore/internal/json_binding:data_type", + "//tensorstore/internal/tracing", "//tensorstore/kvstore", "//tensorstore/serialization", "//tensorstore/serialization:registry", diff --git a/tensorstore/driver/copy.cc b/tensorstore/driver/copy.cc index e868d4d3e..5b04408b8 100644 --- a/tensorstore/driver/copy.cc +++ b/tensorstore/driver/copy.cc @@ -35,6 +35,7 @@ #include "tensorstore/internal/nditerable_data_type_conversion.h" #include "tensorstore/internal/nditerable_util.h" #include "tensorstore/internal/tagged_ptr.h" +#include "tensorstore/internal/tracing/trace_span.h" #include "tensorstore/open_mode.h" #include "tensorstore/progress.h" #include "tensorstore/read_write_options.h" @@ -140,6 +141,7 @@ struct CopyState : public internal::AtomicReferenceCount { Promise copy_promise; Promise commit_promise; IntrusivePtr commit_state{new CommitState}; + internal_tracing::TraceSpan tspan{"tensorstore.Copy"}; void SetError(absl::Status error) { SetDeferredResult(copy_promise, std::move(error)); diff --git a/tensorstore/driver/read.cc b/tensorstore/driver/read.cc index 69676b91c..21acbd1ef 100644 --- a/tensorstore/driver/read.cc +++ b/tensorstore/driver/read.cc @@ -42,6 +42,7 @@ #include "tensorstore/internal/nditerable_transformed_array.h" #include "tensorstore/internal/nditerable_util.h" #include "tensorstore/internal/tagged_ptr.h" +#include "tensorstore/internal/tracing/trace_span.h" #include "tensorstore/internal/type_traits.h" #include "tensorstore/open_mode.h" #include "tensorstore/progress.h" @@ -50,6 +51,7 @@ #include "tensorstore/resize_options.h" #include "tensorstore/transaction.h" #include "tensorstore/util/element_pointer.h" +#include "tensorstore/util/execution/any_receiver.h" #include "tensorstore/util/execution/sender.h" #include "tensorstore/util/executor.h" #include "tensorstore/util/extents.h" @@ -108,6 +110,7 @@ struct ReadState Promise promise; std::atomic copied_elements{0}; Index total_elements; + internal_tracing::TraceSpan tspan{"tensorstore.Read"}; void SetError(absl::Status error) { SetDeferredResult(promise, std::move(error)); diff --git a/tensorstore/driver/write.cc b/tensorstore/driver/write.cc index e3d197f9c..262542b76 100644 --- a/tensorstore/driver/write.cc +++ b/tensorstore/driver/write.cc @@ -36,6 +36,7 @@ #include "tensorstore/internal/nditerable_transformed_array.h" #include "tensorstore/internal/nditerable_util.h" #include "tensorstore/internal/tagged_ptr.h" +#include "tensorstore/internal/tracing/trace_span.h" #include "tensorstore/internal/type_traits.h" #include "tensorstore/open_mode.h" #include "tensorstore/progress.h" @@ -43,6 +44,7 @@ #include "tensorstore/resize_options.h" #include "tensorstore/transaction.h" #include "tensorstore/util/element_pointer.h" +#include "tensorstore/util/execution/any_receiver.h" #include "tensorstore/util/execution/sender.h" #include "tensorstore/util/executor.h" #include "tensorstore/util/future.h" @@ -125,6 +127,7 @@ struct WriteState : public internal::AtomicReferenceCount { Promise copy_promise; Promise commit_promise; IntrusivePtr commit_state{new CommitState}; + internal_tracing::TraceSpan tspan{"tensorstore.Write"}; void SetError(absl::Status error) { SetDeferredResult(copy_promise, std::move(error)); diff --git a/tensorstore/internal/os/BUILD b/tensorstore/internal/os/BUILD index cea9ad604..e7c3d6285 100644 --- a/tensorstore/internal/os/BUILD +++ b/tensorstore/internal/os/BUILD @@ -73,6 +73,7 @@ tensorstore_cc_library( "//tensorstore/internal/log:verbose_flag", "//tensorstore/internal/metrics", "//tensorstore/internal/metrics:metadata", + "//tensorstore/internal/tracing", "//tensorstore/util:quote_string", "//tensorstore/util:result", "//tensorstore/util:status", diff --git a/tensorstore/internal/os/file_util_posix.cc b/tensorstore/internal/os/file_util_posix.cc index 9759ef7e0..5a957eb30 100644 --- a/tensorstore/internal/os/file_util_posix.cc +++ b/tensorstore/internal/os/file_util_posix.cc @@ -35,6 +35,7 @@ #include #include #include +#include #include "absl/base/attributes.h" #include "absl/base/config.h" @@ -52,6 +53,7 @@ #include "tensorstore/internal/metrics/metadata.h" #include "tensorstore/internal/os/error_code.h" #include "tensorstore/internal/os/potentially_blocking_region.h" +#include "tensorstore/internal/tracing/logged_trace_span.h" #include "tensorstore/util/quote_string.h" #include "tensorstore/util/result.h" #include "tensorstore/util/status.h" @@ -69,6 +71,7 @@ using ::tensorstore::internal::PotentiallyBlockingRegion; using ::tensorstore::internal::StatusFromOsError; +using ::tensorstore::internal_tracing::LoggedTraceSpan; // On FreeBSD and Mac OS X, `flock` can safely be used instead of open file // descriptor locks. `flock`/`fcntl`/`lockf` all use the same underlying lock @@ -111,23 +114,14 @@ auto& mmap_active = internal_metrics::Gauge::New( ABSL_CONST_INIT internal_log::VerboseFlag detail_logging("file_detail"); -#define TS_DETAIL_LOG_BEGIN \ - ABSL_LOG_IF(INFO, detail_logging.Level(1)) << "Begin: " << __func__ - -#define TS_DETAIL_LOG_END \ - ABSL_LOG_IF(INFO, detail_logging.Level(1)) << "End: " << __func__ - -#define TS_DETAIL_LOG_ERROR \ - ABSL_LOG_IF(INFO, detail_logging.Level(1)) \ - << "Error: " << __func__ << " " << static_cast(errno) - #if defined(F_OFD_SETLKW) void UnlockFcntlLock(FileDescriptor fd) { + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"fd", fd}}); // This releases a lock acquired by fcntl(F_OFD_SETLKW). // This is not strictly necessary as the posix/linux locks will be released // when the fd is closed, but it allows easier reasoning by making locking // behave similarly across platforms. - TS_DETAIL_LOG_BEGIN << " fd=" << fd; + while (true) { struct ::flock lock; lock.l_type = F_UNLCK; @@ -138,12 +132,11 @@ void UnlockFcntlLock(FileDescriptor fd) { { PotentiallyBlockingRegion region; if (::fcntl(fd, F_OFD_SETLK, &lock) != -1) { - TS_DETAIL_LOG_END << " fd=" << fd; return; } } - TS_DETAIL_LOG_ERROR << " fd=" << fd; if (errno == EINTR) continue; + tspan.Log("errno", errno); return; } ABSL_UNREACHABLE(); @@ -151,17 +144,17 @@ void UnlockFcntlLock(FileDescriptor fd) { #endif void UnlockFlockLock(FileDescriptor fd) { - TS_DETAIL_LOG_BEGIN << " fd=" << fd; + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"fd", fd}}); + while (true) { { PotentiallyBlockingRegion region; if (::flock(fd, LOCK_UN) != -1) { - TS_DETAIL_LOG_END << " fd=" << fd; return; } } if (errno == EINTR) continue; - TS_DETAIL_LOG_ERROR << " fd=" << fd; + tspan.Log("errno", errno); return; } ABSL_UNREACHABLE(); @@ -170,20 +163,21 @@ void UnlockFlockLock(FileDescriptor fd) { } // namespace void FileDescriptorTraits::Close(FileDescriptor fd) { - TS_DETAIL_LOG_BEGIN << " fd=" << fd; + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"fd", fd}}); + while (true) { if (::close(fd) == 0) { - TS_DETAIL_LOG_END << " fd=" << fd; return; } if (errno == EINTR) continue; - TS_DETAIL_LOG_ERROR << " fd=" << fd; + tspan.Log("errno", errno); return; } } Result AcquireFdLock(FileDescriptor fd) { - TS_DETAIL_LOG_BEGIN << " fd=" << fd; + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"fd", fd}}); + #if defined(F_OFD_SETLKW) while (true) { // This blocks until the lock is acquired (SETLKW). If any signal is @@ -203,26 +197,24 @@ Result AcquireFdLock(FileDescriptor fd) { { PotentiallyBlockingRegion region; if (::fcntl(fd, F_OFD_SETLKW, &lock) != -1) { - TS_DETAIL_LOG_END << " fd=" << fd; return UnlockFcntlLock; } } if (errno == EINTR) continue; if (errno == EINVAL || errno == ENOTSUP) break; - TS_DETAIL_LOG_ERROR << " fd=" << fd; - return StatusFromOsError(errno, "Failed to lock file"); + auto status = StatusFromOsError(errno, "Failed to lock file"); + return std::move(tspan).EndWithStatus(std::move(status)); } #endif while (true) { { PotentiallyBlockingRegion region; if (::flock(fd, LOCK_EX) != -1) { - TS_DETAIL_LOG_END << " fd=" << fd; return UnlockFlockLock; } if (errno == EINTR) continue; - TS_DETAIL_LOG_ERROR << " fd=" << fd; - return StatusFromOsError(errno, "Failed to lock file"); + auto status = StatusFromOsError(errno, "Failed to lock file"); + return std::move(tspan).EndWithStatus(std::move(status)); } } ABSL_UNREACHABLE(); @@ -230,7 +222,8 @@ Result AcquireFdLock(FileDescriptor fd) { Result OpenFileWrapper(const std::string& path, OpenFlags flags) { - TS_DETAIL_LOG_BEGIN << " path=" << QuoteString(path); + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"path", path}}); + FileDescriptor fd = FileDescriptorTraits::Invalid(); const auto attempt_open = [&] { PotentiallyBlockingRegion region; @@ -254,33 +247,36 @@ Result OpenFileWrapper(const std::string& path, #endif if (fd == FileDescriptorTraits::Invalid()) { - TS_DETAIL_LOG_ERROR << " path=" << QuoteString(path); - return StatusFromOsError(errno, "Failed to open: ", QuoteString(path)); + auto status = + StatusFromOsError(errno, "Failed to open: ", QuoteString(path)); + return std::move(tspan).EndWithStatus(std::move(status)); + } else { + tspan.Log("fd", fd); + return UniqueFileDescriptor(fd); } - TS_DETAIL_LOG_END << " path=" << QuoteString(path) << ", fd=" << fd; - return UniqueFileDescriptor(fd); } Result ReadFromFile(FileDescriptor fd, void* buf, size_t count, int64_t offset) { - TS_DETAIL_LOG_BEGIN << " fd=" << fd << ", count=" << count - << ", offset=" << offset; + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), + {{"fd", fd}, {"count", count}, {"offset", offset}}); ssize_t n; do { PotentiallyBlockingRegion region; n = ::pread(fd, buf, count, static_cast(offset)); } while ((n < 0) && (errno == EINTR || errno == EAGAIN)); if (n >= 0) { - TS_DETAIL_LOG_END << " fd=" << fd << ", n=" << n; return n; } - TS_DETAIL_LOG_ERROR << " fd=" << fd; - return StatusFromOsError(errno, "Failed to read from file"); + auto status = StatusFromOsError(errno, "Failed to read from file"); + return std::move(tspan).EndWithStatus(std::move(status)); } Result WriteToFile(FileDescriptor fd, const void* buf, size_t count) { - TS_DETAIL_LOG_BEGIN << " fd=" << fd << ", count=" << count; + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), + {{"fd", fd}, {"count", count}}); + ssize_t n; do { PotentiallyBlockingRegion region; @@ -289,15 +285,16 @@ Result WriteToFile(FileDescriptor fd, const void* buf, if (count != 0 && n == 0) { errno = ENOSPC; } else if (n >= 0) { - TS_DETAIL_LOG_END << " fd=" << fd << ", n=" << n; return n; } - TS_DETAIL_LOG_ERROR << " fd=" << fd; - return StatusFromOsError(errno, "Failed to write to file"); + auto status = StatusFromOsError(errno, "Failed to write to file"); + return std::move(tspan).EndWithStatus(std::move(status)); } Result WriteCordToFile(FileDescriptor fd, absl::Cord value) { - TS_DETAIL_LOG_BEGIN << " fd=" << fd << ", count=" << value.size(); + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), + {{"fd", fd}, {"count", value.size()}}); + absl::InlinedVector iovs; for (std::string_view chunk : value.Chunks()) { struct iovec iov; @@ -315,59 +312,61 @@ Result WriteCordToFile(FileDescriptor fd, absl::Cord value) { if (!value.empty() && n == 0) { errno = ENOSPC; } else if (n >= 0) { - TS_DETAIL_LOG_END << " fd=" << fd << ", n=" << n; return n; } - TS_DETAIL_LOG_ERROR << " fd=" << fd; - return StatusFromOsError(errno, "Failed to write to file"); + auto status = StatusFromOsError(errno, "Failed to write to file"); + return std::move(tspan).EndWithStatus(std::move(status)); } absl::Status TruncateFile(FileDescriptor fd) { + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"fd", fd}}); PotentiallyBlockingRegion region; if (::ftruncate(fd, 0) == 0) { return absl::OkStatus(); } - return StatusFromOsError(errno, "Failed to truncate file"); + auto status = StatusFromOsError(errno, "Failed to truncate file"); + return std::move(tspan).EndWithStatus(std::move(status)); } absl::Status RenameOpenFile(FileDescriptor fd, const std::string& old_name, const std::string& new_name) { - TS_DETAIL_LOG_BEGIN << " fd=" << fd << ", old_name=" << QuoteString(old_name) - << ", new_name=" << QuoteString(new_name); + LoggedTraceSpan tspan( + __func__, detail_logging.Level(1), + {{"fd", fd}, {"old_name", old_name}, {"new_name", new_name}}); PotentiallyBlockingRegion region; if (::rename(old_name.c_str(), new_name.c_str()) == 0) { - TS_DETAIL_LOG_END << " fd=" << fd << ", old_name=" << QuoteString(old_name) - << ", new_name=" << QuoteString(new_name); return absl::OkStatus(); } - TS_DETAIL_LOG_ERROR << " fd=" << fd << ", old_name=" << QuoteString(old_name) - << ", new_name=" << QuoteString(new_name); - return StatusFromOsError(errno, "Failed to rename fd: ", absl::StrCat(fd), - " ", QuoteString(old_name), - " to: ", QuoteString(new_name)); + auto status = + StatusFromOsError(errno, "Failed to rename fd: ", absl::StrCat(fd), " ", + QuoteString(old_name), " to: ", QuoteString(new_name)); + return std::move(tspan).EndWithStatus(std::move(status)); } absl::Status DeleteOpenFile(FileDescriptor fd, const std::string& path) { - TS_DETAIL_LOG_BEGIN << " fd=" << fd << ", path=" << QuoteString(path); + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), + {{"fd", fd}, {"path", path}}); + PotentiallyBlockingRegion region; if (::unlink(path.c_str()) == 0) { - TS_DETAIL_LOG_END << " fd=" << fd; return absl::OkStatus(); } - TS_DETAIL_LOG_ERROR << " fd=" << fd; - return StatusFromOsError(errno, "Failed to delete: ", QuoteString(path)); + auto status = + StatusFromOsError(errno, "Failed to delete: ", QuoteString(path)); + return std::move(tspan).EndWithStatus(std::move(status)); } absl::Status DeleteFile(const std::string& path) { - TS_DETAIL_LOG_BEGIN << " path=" << QuoteString(path); + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"path", path}}); + PotentiallyBlockingRegion region; if (::unlink(path.c_str()) == 0) { - TS_DETAIL_LOG_END << " path=" << QuoteString(path); return absl::OkStatus(); } - TS_DETAIL_LOG_ERROR << " path=" << QuoteString(path); - return StatusFromOsError(errno, "Failed to delete: ", QuoteString(path)); + auto status = + StatusFromOsError(errno, "Failed to delete: ", QuoteString(path)); + return std::move(tspan).EndWithStatus(std::move(status)); } uint32_t GetDefaultPageSize() { @@ -379,9 +378,10 @@ uint32_t GetDefaultPageSize() { Result MemmapFileReadOnly(FileDescriptor fd, size_t offset, size_t size) { - TS_DETAIL_LOG_BEGIN << " fd=" << fd << ", offset=" << offset - << ", size=" << size; #if ABSL_HAVE_MMAP + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), + {{"fd", fd}, {"offset", offset}, {"size", size}}); + if (offset > 0 && offset % GetDefaultPageSize() != 0) { return absl::InvalidArgumentError( "Offset must be a multiple of the default page size."); @@ -389,36 +389,33 @@ Result MemmapFileReadOnly(FileDescriptor fd, size_t offset, if (size == 0) { struct ::stat info; if (::fstat(fd, &info) != 0) { - TS_DETAIL_LOG_ERROR << " fd=" << fd; - return StatusFromOsError(errno, "Failed to stat"); + return std::move(tspan).EndWithStatus( + StatusFromOsError(errno, "Failed to stat")); } uint64_t file_size = GetSize(info); if (offset + size > file_size) { - TS_DETAIL_LOG_ERROR << " fd=" << fd; - return absl::OutOfRangeError( + return std::move(tspan).EndWithStatus(absl::OutOfRangeError( absl::StrCat("Requested offset ", offset, " + size ", size, - " exceeds file size ", file_size)); + " exceeds file size ", file_size))); } else if (size == 0) { size = file_size - offset; } if (size == 0) { - TS_DETAIL_LOG_END << " fd=" << fd; return MappedRegion(nullptr, 0); } } void* address = ::mmap(nullptr, size, PROT_READ, MAP_PRIVATE, fd, 0); if (address == MAP_FAILED) { - TS_DETAIL_LOG_ERROR << " fd=" << fd; - return StatusFromOsError(errno, "Failed to mmap"); + return std::move(tspan).EndWithStatus( + StatusFromOsError(errno, "Failed to mmap")); } ::madvise(address, size, MADV_WILLNEED); mmap_count.Increment(); mmap_bytes.IncrementBy(size); mmap_active.Increment(); - TS_DETAIL_LOG_END << " fd=" << fd; return MappedRegion(static_cast(address), size); #else return absl::UnimplementedError("::mmap not supported"); @@ -427,7 +424,6 @@ Result MemmapFileReadOnly(FileDescriptor fd, size_t offset, MappedRegion::~MappedRegion() { if (data_) { - TS_DETAIL_LOG_BEGIN; if (::munmap(const_cast(data_), size_) != 0) { ABSL_LOG(FATAL) << StatusFromOsError(errno, "Failed to unmap file"); } @@ -436,66 +432,72 @@ MappedRegion::~MappedRegion() { } absl::Status FsyncFile(FileDescriptor fd) { - TS_DETAIL_LOG_BEGIN << " fd=" << fd; + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"fd", fd}}); PotentiallyBlockingRegion region; if (::fsync(fd) == 0) { - TS_DETAIL_LOG_END << " fd=" << fd; return absl::OkStatus(); } - TS_DETAIL_LOG_ERROR << " fd=" << fd; - return StatusFromOsError(errno); + auto status = StatusFromOsError(errno, "Failed to fsync file"); + return std::move(tspan).EndWithStatus(std::move(status)); } absl::Status GetFileInfo(FileDescriptor fd, FileInfo* info) { - TS_DETAIL_LOG_BEGIN << " fd=" << fd; + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"fd", fd}}); + PotentiallyBlockingRegion region; if (::fstat(fd, info) == 0) { - TS_DETAIL_LOG_END << " fd=" << fd; return absl::OkStatus(); } - TS_DETAIL_LOG_ERROR << " fd=" << fd; - return StatusFromOsError(errno); + auto status = StatusFromOsError(errno, "Failed to get file info"); + return std::move(tspan).EndWithStatus(std::move(status)); } absl::Status GetFileInfo(const std::string& path, FileInfo* info) { - TS_DETAIL_LOG_BEGIN << " path=" << QuoteString(path); + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"path", path}}); + PotentiallyBlockingRegion region; if (::stat(path.c_str(), info) == 0) { - TS_DETAIL_LOG_END << " path=" << QuoteString(path); return absl::OkStatus(); } - TS_DETAIL_LOG_ERROR << " path=" << QuoteString(path); - return StatusFromOsError(errno); + auto status = StatusFromOsError(errno); + return std::move(tspan).EndWithStatus(std::move(status)); } Result OpenDirectoryDescriptor(const std::string& path) { + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"path", path}}); FileDescriptor fd; { PotentiallyBlockingRegion region; fd = ::open(path.c_str(), O_RDONLY | O_DIRECTORY | O_CLOEXEC, 0); } if (fd == FileDescriptorTraits::Invalid()) { - return StatusFromOsError(errno, - "Failed to open directory: ", QuoteString(path)); + auto status = StatusFromOsError( + errno, "Failed to open directory: ", QuoteString(path)); + return std::move(tspan).EndWithStatus(std::move(status)); } return UniqueFileDescriptor(fd); } absl::Status MakeDirectory(const std::string& path) { + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"path", path}}); PotentiallyBlockingRegion region; if (::mkdir(path.c_str(), 0777) == 0 || errno == EEXIST) { return absl::OkStatus(); } - return StatusFromOsError(errno, - "Failed to create directory: ", QuoteString(path)); + auto status = StatusFromOsError( + errno, "Failed to create directory: ", QuoteString(path)); + return std::move(tspan).EndWithStatus(std::move(status)); } absl::Status FsyncDirectory(FileDescriptor fd) { + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"fd", fd}}); + PotentiallyBlockingRegion region; if (::fsync(fd) == 0) { return absl::OkStatus(); } - return StatusFromOsError(errno); + auto status = StatusFromOsError(errno, "Failed to fsync directory"); + return std::move(tspan).EndWithStatus(std::move(status)); } } // namespace internal_os diff --git a/tensorstore/internal/os/file_util_win.cc b/tensorstore/internal/os/file_util_win.cc index 420b986c9..672217cf4 100644 --- a/tensorstore/internal/os/file_util_win.cc +++ b/tensorstore/internal/os/file_util_win.cc @@ -73,6 +73,7 @@ #include #include #include +#include #include "absl/base/attributes.h" #include "absl/base/optimization.h" @@ -90,6 +91,7 @@ #include "tensorstore/internal/os/error_code.h" #include "tensorstore/internal/os/potentially_blocking_region.h" #include "tensorstore/internal/os/wstring.h" +#include "tensorstore/internal/tracing/logged_trace_span.h" #include "tensorstore/util/quote_string.h" #include "tensorstore/util/result.h" #include "tensorstore/util/status.h" @@ -103,6 +105,7 @@ using ::tensorstore::internal::ConvertUTF8ToWindowsWide; using ::tensorstore::internal::ConvertWindowsWideToUTF8; using ::tensorstore::internal::StatusFromOsError; +using ::tensorstore::internal_tracing::LoggedTraceSpan; namespace tensorstore { namespace internal_os { @@ -122,16 +125,6 @@ auto& mmap_active = internal_metrics::Gauge::New( ABSL_CONST_INIT internal_log::VerboseFlag detail_logging("file_detail"); -#define TS_DETAIL_LOG_BEGIN \ - ABSL_LOG_IF(INFO, detail_logging.Level(1)) << "Begin: " << __func__ - -#define TS_DETAIL_LOG_END \ - ABSL_LOG_IF(INFO, detail_logging.Level(1)) << "End: " << __func__ - -#define TS_DETAIL_LOG_ERROR \ - ABSL_LOG_IF(INFO, detail_logging.Level(1)) \ - << "Error: " << __func__ << " " << ::GetLastError() - /// Maximum length of Windows path, including terminating NUL. constexpr size_t kMaxWindowsPathSize = 32768; @@ -152,6 +145,7 @@ inline ::OVERLAPPED GetLockOverlapped() { } bool RenameFilePosix(FileDescriptor fd, const std::wstring& new_name) { + LoggedTraceSpan tspan(__func__, detail_logging.Level(1)); alignas(::FILE_RENAME_INFO) char file_rename_info_buffer[sizeof(::FILE_RENAME_INFO) + kMaxWindowsPathSize - 1]; @@ -167,6 +161,8 @@ bool RenameFilePosix(FileDescriptor fd, const std::wstring& new_name) { } bool DeleteFilePosix(FileDescriptor fd) { + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"handle", fd}}); + FileDispositionInfoExData disposition_info; disposition_info.Flags = 0x00000001 /*FILE_DISPOSITION_DELETE*/ | 0x00000002 /*FILE_DISPOSITION_POSIX_SEMANTICS*/; @@ -191,16 +187,17 @@ Result GetFileAttributes(const std::wstring& filename) { #endif void UnlockWin32Lock(FileDescriptor fd) { - TS_DETAIL_LOG_BEGIN << " handle=" << fd; + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"handle", fd}}); + auto lock_offset = GetLockOverlapped(); // Ignore any errors. ::UnlockFileEx(fd, /*dwReserved=*/0, /*nNumberOfBytesToUnlockLow=*/1, /*nNumberOfBytesToUnlockHigh=*/0, /*lpOverlapped=*/&lock_offset); - TS_DETAIL_LOG_END << " handle=" << fd; } FileDescriptor OpenFileImpl(const std::wstring& wpath, OpenFlags flags) { + LoggedTraceSpan tspan(__func__, detail_logging.Level(1)); // Setup Win32 flags to somewhat mimic the POSIX flags. DWORD access = 0; if ((flags & OpenFlags::OpenReadOnly) == OpenFlags::OpenReadOnly) { @@ -241,46 +238,49 @@ FileDescriptor OpenFileImpl(const std::wstring& wpath, OpenFlags flags) { } // namespace void FileDescriptorTraits::Close(FileDescriptor fd) { - TS_DETAIL_LOG_BEGIN << " handle=" << fd; + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"handle", fd}}); + ::CloseHandle(fd); - TS_DETAIL_LOG_END << " handle=" << fd; } Result AcquireFdLock(FileDescriptor fd) { - TS_DETAIL_LOG_BEGIN << " handle=" << fd; + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"handle", fd}}); + auto lock_offset = GetLockOverlapped(); if (::LockFileEx(fd, /*dwFlags=*/LOCKFILE_EXCLUSIVE_LOCK, /*dwReserved=*/0, /*nNumberOfBytesToLockLow=*/1, /*nNumberOfBytesToLockHigh=*/0, /*lpOverlapped=*/&lock_offset)) { - TS_DETAIL_LOG_END << " handle=" << fd; return UnlockWin32Lock; } - TS_DETAIL_LOG_ERROR << " handle=" << fd; - return StatusFromOsError(::GetLastError(), "Failed to lock file"); + auto status = StatusFromOsError(::GetLastError(), "Failed to lock file"); + return std::move(tspan).EndWithStatus(std::move(status)); } Result OpenFileWrapper(const std::string& path, OpenFlags flags) { - TS_DETAIL_LOG_BEGIN << " path=" << QuoteString(path); + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"path", path}}); + std::wstring wpath; TENSORSTORE_RETURN_IF_ERROR(ConvertUTF8ToWindowsWide(path, wpath)); FileDescriptor fd = OpenFileImpl(wpath, flags); if (fd == FileDescriptorTraits::Invalid()) { - TS_DETAIL_LOG_ERROR << " path=\"" << path << "\""; - return StatusFromOsError(::GetLastError(), - "Failed to open: ", QuoteString(path)); + auto status = StatusFromOsError(::GetLastError(), + "Failed to open: ", QuoteString(path)); + return std::move(tspan).EndWithStatus(std::move(status)); } - TS_DETAIL_LOG_END << " path=" << QuoteString(path) << ", handle=" << fd; + tspan.Log("fd", fd); return UniqueFileDescriptor(fd); } Result ReadFromFile(FileDescriptor fd, void* buf, size_t count, int64_t offset) { - TS_DETAIL_LOG_BEGIN << " handle=" << fd; + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), + {{"handle", fd}, {"size", count}}); + auto overlapped = GetOverlappedWithOffset(static_cast(offset)); if (count > std::numeric_limits::max()) { count = std::numeric_limits::max(); @@ -288,30 +288,33 @@ Result ReadFromFile(FileDescriptor fd, void* buf, size_t count, DWORD bytes_read; if (::ReadFile(fd, buf, static_cast(count), &bytes_read, &overlapped)) { - TS_DETAIL_LOG_END << " handle=" << fd << ", bytes_read=" << bytes_read; return static_cast(bytes_read); } - TS_DETAIL_LOG_ERROR << " handle=" << fd; - return StatusFromOsError(::GetLastError(), "Failed to read from file"); + auto status = StatusFromOsError(::GetLastError(), "Failed to read from file"); + return std::move(tspan).EndWithStatus(std::move(status)); } Result WriteToFile(FileDescriptor fd, const void* buf, size_t count) { - TS_DETAIL_LOG_BEGIN << " handle=" << fd << ", count=" << count; + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), + {{"handle", fd}, {"size", count}}); + if (count > std::numeric_limits::max()) { count = std::numeric_limits::max(); } DWORD num_written; if (::WriteFile(fd, buf, static_cast(count), &num_written, /*lpOverlapped=*/nullptr)) { - TS_DETAIL_LOG_END << " handle=" << fd; return static_cast(num_written); } - TS_DETAIL_LOG_ERROR << " handle=" << fd; - return StatusFromOsError(::GetLastError(), "Failed to write to file"); + auto status = StatusFromOsError(::GetLastError(), "Failed to write to file"); + return std::move(tspan).EndWithStatus(std::move(status)); } Result WriteCordToFile(FileDescriptor fd, absl::Cord value) { + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), + {{"handle", fd}, {"size", value.size()}}); + // If we switched to OVERLAPPED io on Windows, then using WriteFileGather // would be similar to the unix ::writev call. size_t value_remaining = value.size(); @@ -329,23 +332,26 @@ Result WriteCordToFile(FileDescriptor fd, absl::Cord value) { } absl::Status TruncateFile(FileDescriptor fd) { + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"handle", fd}}); + if (::SetEndOfFile(fd)) { return absl::OkStatus(); } - return StatusFromOsError(::GetLastError(), "Failed to truncate file"); + auto status = StatusFromOsError(::GetLastError(), "Failed to truncate file"); + return std::move(tspan).EndWithStatus(std::move(status)); } absl::Status RenameOpenFile(FileDescriptor fd, const std::string& old_name, const std::string& new_name) { - TS_DETAIL_LOG_BEGIN << " handle=" << fd - << ", old_name=" << QuoteString(old_name) - << ", new_name=" << QuoteString(new_name); + LoggedTraceSpan tspan( + __func__, detail_logging.Level(1), + {{"handle", fd}, {"old_name", old_name}, {"new_name", new_name}}); + std::wstring wpath_new; TENSORSTORE_RETURN_IF_ERROR(ConvertUTF8ToWindowsWide(new_name, wpath_new)); // Try using Posix semantics. if (RenameFilePosix(fd, wpath_new)) { - TS_DETAIL_LOG_END << " handle=" << fd; return absl::OkStatus(); } @@ -361,17 +367,19 @@ absl::Status RenameOpenFile(FileDescriptor fd, const std::string& old_name, // Try using MoveFileEx, which may not be atomic. if (::MoveFileExW(wpath_old.c_str(), wpath_new.c_str(), MOVEFILE_REPLACE_EXISTING | MOVEFILE_WRITE_THROUGH)) { - TS_DETAIL_LOG_END << " handle=" << fd; return absl::OkStatus(); } - TS_DETAIL_LOG_ERROR << " handle=" << fd; - return StatusFromOsError(::GetLastError(), - "Failed to rename: ", QuoteString(old_name), - " to: ", QuoteString(new_name)); + auto status = StatusFromOsError(::GetLastError(), + "Failed to rename: ", QuoteString(old_name), + " to: ", QuoteString(new_name)); + return std::move(tspan).EndWithStatus(std::move(status)); } absl::Status DeleteOpenFile(FileDescriptor fd, const std::string& path) { + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), + {{"handle", fd}, {"path", path}}); + // This relies on the "POSIX Semantics" flag supported by Windows 10 in // order to remove the file from its containing directory as soon as the // handle is closed. However, after the call to @@ -380,7 +388,7 @@ absl::Status DeleteOpenFile(FileDescriptor fd, const std::string& path) { // result in the normal read/write paths failing with an error. To avoid // that problem, we first rename the file to a random name, with a suffix of // `kLockSuffix` to prevent it from being included in List results. - TS_DETAIL_LOG_BEGIN << " handle=" << fd << ", path=" << QuoteString(path); + unsigned int buf[5]; for (int i = 0; i < 5; ++i) { ::rand_s(&buf[i]); @@ -408,7 +416,6 @@ absl::Status DeleteOpenFile(FileDescriptor fd, const std::string& path) { } // Attempt to delete the open handle using posix semantics? if (DeleteFilePosix(fd)) { - TS_DETAIL_LOG_END << " handle=" << fd; return absl::OkStatus(); } #ifndef NDEBUG @@ -418,15 +425,16 @@ absl::Status DeleteOpenFile(FileDescriptor fd, const std::string& path) { #endif // The file has been renamed, so delete the renamed file. if (::DeleteFileW(wpath_temp.c_str())) { - TS_DETAIL_LOG_END << " handle=" << fd; return absl::OkStatus(); } - TS_DETAIL_LOG_ERROR << " handle=" << fd; - return StatusFromOsError(::GetLastError(), - "Failed to delete: ", QuoteString(path)); + auto status = StatusFromOsError(::GetLastError(), + "Failed to delete: ", QuoteString(path)); + return std::move(tspan).EndWithStatus(std::move(status)); } absl::Status DeleteFile(const std::string& path) { + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"path", path}}); + std::wstring wpath; TENSORSTORE_RETURN_IF_ERROR(ConvertUTF8ToWindowsWide(path, wpath)); UniqueFileDescriptor delete_fd(::CreateFileW( @@ -443,8 +451,8 @@ absl::Status DeleteFile(const std::string& path) { if (delete_fd.valid()) { return DeleteOpenFile(delete_fd.get(), path); } - return StatusFromOsError(::GetLastError(), - "Failed to delete: ", QuoteString(path)); + return std::move(tspan).EndWithStatus(StatusFromOsError( + ::GetLastError(), "Failed to delete: ", QuoteString(path))); } uint32_t GetDefaultPageSize() { @@ -462,19 +470,21 @@ Result MemmapFileReadOnly(FileDescriptor fd, size_t offset, return absl::InvalidArgumentError( "Offset must be a multiple of the default page size."); } + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), + {{"handle", fd}, {"offset", offset}, {"size", size}}); + if (size == 0) { ::BY_HANDLE_FILE_INFORMATION info; if (!::GetFileInformationByHandle(fd, &info)) { - TS_DETAIL_LOG_ERROR << " handle=" << fd; - return StatusFromOsError(::GetLastError(), - "Failed in GetFileInformationByHandle"); + return std::move(tspan).EndWithStatus(StatusFromOsError( + ::GetLastError(), "Failed in GetFileInformationByHandle")); } uint64_t file_size = GetSize(info); if (offset + size > file_size) { - return absl::OutOfRangeError( + return std::move(tspan).EndWithStatus(absl::OutOfRangeError( absl::StrCat("Requested offset ", offset, " + size ", size, - " exceeds file size ", file_size)); + " exceeds file size ", file_size))); } else if (size == 0) { size = file_size - offset; } @@ -483,16 +493,16 @@ Result MemmapFileReadOnly(FileDescriptor fd, size_t offset, UniqueFileDescriptor map_fd( ::CreateFileMappingA(fd, NULL, PAGE_READONLY, 0, 0, nullptr)); if (!map_fd.valid()) { - TS_DETAIL_LOG_ERROR << " handle=" << fd; - return StatusFromOsError(::GetLastError(), "Failed in CreateFileMappingA"); + return std::move(tspan).EndWithStatus( + StatusFromOsError(::GetLastError(), "Failed in CreateFileMappingA")); } void* address = ::MapViewOfFile( map_fd.get(), FILE_MAP_READ, static_cast(offset >> 32), static_cast(offset & 0xffffffff), size); if (!address) { - TS_DETAIL_LOG_ERROR << " handle=" << fd; - return StatusFromOsError(::GetLastError(), "Failed in MapViewOfFile"); + return std::move(tspan).EndWithStatus( + StatusFromOsError(::GetLastError(), "Failed in MapViewOfFile")); } { @@ -500,7 +510,6 @@ Result MemmapFileReadOnly(FileDescriptor fd, size_t offset, PrefetchVirtualMemory(GetCurrentProcess(), 1, &entry, 0); } - TS_DETAIL_LOG_END << " handle=" << fd; mmap_count.Increment(); mmap_bytes.IncrementBy(size); mmap_active.Increment(); @@ -518,27 +527,27 @@ MappedRegion::~MappedRegion() { } absl::Status FsyncFile(FileDescriptor fd) { - TS_DETAIL_LOG_BEGIN << " handle=" << fd; + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"handle", fd}}); + if (::FlushFileBuffers(fd)) { - TS_DETAIL_LOG_END << " handle=" << fd; return absl::OkStatus(); } - TS_DETAIL_LOG_ERROR << " handle=" << fd; - return StatusFromOsError(::GetLastError()); + auto status = StatusFromOsError(::GetLastError()); + return std::move(tspan).EndWithStatus(std::move(status)); } absl::Status GetFileInfo(FileDescriptor fd, FileInfo* info) { - TS_DETAIL_LOG_BEGIN << " handle=" << fd; + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"handle", fd}}); + if (::GetFileInformationByHandle(fd, info)) { - TS_DETAIL_LOG_END << " handle=" << fd; return absl::OkStatus(); } - TS_DETAIL_LOG_ERROR << " handle=" << fd; - return StatusFromOsError(::GetLastError()); + auto status = StatusFromOsError(::GetLastError()); + return std::move(tspan).EndWithStatus(std::move(status)); } absl::Status GetFileInfo(const std::string& path, FileInfo* info) { - TS_DETAIL_LOG_BEGIN << " path=" << QuoteString(path); + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"path", path}}); // The typedef uses BY_HANDLE_FILE_INFO, which includes device and index // metadata, and requires an open handle. @@ -553,16 +562,16 @@ absl::Status GetFileInfo(const std::string& path, FileInfo* info) { /*hTemplateFile=*/nullptr)); if (stat_fd.valid()) { if (::GetFileInformationByHandle(stat_fd.get(), info)) { - TS_DETAIL_LOG_END << " path=" << QuoteString(path); return absl::OkStatus(); } } - TS_DETAIL_LOG_ERROR << " path=" << QuoteString(path); - return StatusFromOsError(::GetLastError(), - "Failed to stat file: ", QuoteString(path)); + auto status = StatusFromOsError(::GetLastError(), + "Failed to stat file: ", QuoteString(path)); + return std::move(tspan).EndWithStatus(std::move(status)); } Result OpenDirectoryDescriptor(const std::string& path) { + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"path", path}}); std::wstring wpath; TENSORSTORE_RETURN_IF_ERROR(ConvertUTF8ToWindowsWide(path, wpath)); FileDescriptor fd = ::CreateFileW( @@ -573,13 +582,16 @@ Result OpenDirectoryDescriptor(const std::string& path) { /*dwFlagsAndAttributes=*/FILE_FLAG_BACKUP_SEMANTICS, /*hTemplateFile=*/nullptr); if (fd == FileDescriptorTraits::Invalid()) { - return StatusFromOsError(::GetLastError(), - "Failed to open directory: ", QuoteString(path)); + auto status = StatusFromOsError( + ::GetLastError(), "Failed to open directory: ", QuoteString(path)); + return std::move(tspan).EndWithStatus(std::move(status)); } + tspan.Log("fd", fd); return UniqueFileDescriptor(fd); } absl::Status MakeDirectory(const std::string& path) { + LoggedTraceSpan tspan(__func__, detail_logging.Level(1), {{"path", path}}); std::wstring wpath; TENSORSTORE_RETURN_IF_ERROR(ConvertUTF8ToWindowsWide(path, wpath)); if (::CreateDirectoryW(wpath.c_str(), @@ -587,8 +599,9 @@ absl::Status MakeDirectory(const std::string& path) { ::GetLastError() == ERROR_ALREADY_EXISTS) { return absl::OkStatus(); } - return StatusFromOsError(::GetLastError(), - "Failed to create directory: ", QuoteString(path)); + auto status = StatusFromOsError( + ::GetLastError(), "Failed to create directory: ", QuoteString(path)); + return std::move(tspan).EndWithStatus(std::move(status)); } absl::Status FsyncDirectory(FileDescriptor fd) { diff --git a/tensorstore/internal/thread/BUILD b/tensorstore/internal/thread/BUILD index 534788e0b..5f9607be4 100644 --- a/tensorstore/internal/thread/BUILD +++ b/tensorstore/internal/thread/BUILD @@ -71,9 +71,9 @@ tensorstore_cc_library( ":task", ":task_group_impl", "//tensorstore/internal:intrusive_ptr", + "//tensorstore/internal/tracing", "//tensorstore/util:executor", "@com_google_absl//absl/base:no_destructor", - "@com_google_absl//absl/log:absl_check", "@com_google_absl//absl/log:absl_log", ], ) @@ -179,6 +179,7 @@ tensorstore_cc_test( ":task", ":task_provider", "//tensorstore/internal:intrusive_ptr", + "//tensorstore/internal/tracing", "@com_google_absl//absl/base:core_headers", "@com_google_absl//absl/synchronization", "@com_google_googletest//:gtest_main", diff --git a/tensorstore/internal/thread/pool_impl_test.cc b/tensorstore/internal/thread/pool_impl_test.cc index 4bda298ee..7e41d7ee1 100644 --- a/tensorstore/internal/thread/pool_impl_test.cc +++ b/tensorstore/internal/thread/pool_impl_test.cc @@ -30,6 +30,7 @@ #include "tensorstore/internal/intrusive_ptr.h" #include "tensorstore/internal/thread/task.h" #include "tensorstore/internal/thread/task_provider.h" +#include "tensorstore/internal/tracing/trace_context.h" namespace { @@ -38,6 +39,7 @@ using ::tensorstore::internal::MakeIntrusivePtr; using ::tensorstore::internal_thread_impl::InFlightTask; using ::tensorstore::internal_thread_impl::SharedThreadPool; using ::tensorstore::internal_thread_impl::TaskProvider; +using TC = ::tensorstore::internal_tracing::TraceContext; struct SingleTaskProvider : public TaskProvider { struct private_t {}; @@ -99,7 +101,8 @@ TEST(SharedThreadPoolTest, Basic) { { absl::Notification notification; auto provider = SingleTaskProvider::Make( - pool, std::make_unique([&] { notification.Notify(); })); + pool, std::make_unique([&] { notification.Notify(); }, + TC(TC::kThread))); provider->Trigger(); provider->Trigger(); @@ -119,7 +122,8 @@ TEST(SharedThreadPoolTest, LotsOfProviders) { absl::BlockingCounter a(i); for (int j = 0; j < i; j++) { providers.push_back(SingleTaskProvider::Make( - pool, std::make_unique([&] { a.DecrementCount(); }))); + pool, std::make_unique([&] { a.DecrementCount(); }, + TC(TC::kThread)))); } for (auto& p : providers) p->Trigger(); a.Wait(); diff --git a/tensorstore/internal/thread/schedule_at.cc b/tensorstore/internal/thread/schedule_at.cc index 0dbf454f5..1a4818f4f 100644 --- a/tensorstore/internal/thread/schedule_at.cc +++ b/tensorstore/internal/thread/schedule_at.cc @@ -38,7 +38,7 @@ #include "tensorstore/internal/metrics/value.h" #include "tensorstore/internal/tagged_ptr.h" #include "tensorstore/internal/thread/thread.h" -#include "tensorstore/internal/tracing/tracing.h" +#include "tensorstore/internal/tracing/trace_context.h" #include "tensorstore/util/stop_token.h" using ::tensorstore::internal_metrics::MetricMetadata; diff --git a/tensorstore/internal/thread/task.h b/tensorstore/internal/thread/task.h index 19029e2f0..2998e043a 100644 --- a/tensorstore/internal/thread/task.h +++ b/tensorstore/internal/thread/task.h @@ -22,17 +22,17 @@ #include "absl/base/attributes.h" #include "absl/functional/any_invocable.h" #include "absl/time/clock.h" -#include "tensorstore/internal/tracing/tracing.h" +#include "tensorstore/internal/tracing/trace_context.h" namespace tensorstore { namespace internal_thread_impl { /// An in-flight task. Implementation detail of thread_pool. struct InFlightTask { - InFlightTask(absl::AnyInvocable callback) + InFlightTask(absl::AnyInvocable callback, + internal_tracing::TraceContext tc) : callback_(std::move(callback)), - tc_(internal_tracing::TraceContext( - internal_tracing::TraceContext::kThread)), + tc_(std::move(tc)), start_nanos(absl::GetCurrentTimeNanos()) {} void Run() { diff --git a/tensorstore/internal/thread/thread_pool.cc b/tensorstore/internal/thread/thread_pool.cc index 4a574506b..e9550b64d 100644 --- a/tensorstore/internal/thread/thread_pool.cc +++ b/tensorstore/internal/thread/thread_pool.cc @@ -23,18 +23,31 @@ #include #include "absl/base/no_destructor.h" -#include "absl/log/absl_check.h" #include "absl/log/absl_log.h" #include "tensorstore/internal/intrusive_ptr.h" #include "tensorstore/internal/thread/pool_impl.h" #include "tensorstore/internal/thread/task.h" #include "tensorstore/internal/thread/task_group_impl.h" +#include "tensorstore/internal/tracing/trace_context.h" #include "tensorstore/util/executor.h" namespace tensorstore { namespace internal { namespace { +struct DetachedPoolImpl { + internal::IntrusivePtr task_group; + + void operator()(ExecutorTask task, internal_tracing::TraceContext tc) const { + task_group->AddTask(std::make_unique( + std::move(task), std::move(tc))); + } + void operator()(ExecutorTask task) const { + operator()(std::move(task), internal_tracing::TraceContext( + internal_tracing::TraceContext::kThread)); + } +}; + Executor DefaultThreadPool(size_t num_threads) { static absl::NoDestructor pool_; intrusive_ptr_increment(pool_.get()); @@ -47,14 +60,10 @@ Executor DefaultThreadPool(size_t num_threads) { << num_threads; } - auto task_group = internal_thread_impl::TaskGroup::Make( + return DetachedPoolImpl{internal_thread_impl::TaskGroup::Make( internal::IntrusivePtr( pool_.get()), - num_threads); - return [task_group = std::move(task_group)](ExecutorTask task) { - task_group->AddTask( - std::make_unique(std::move(task))); - }; + num_threads)}; } } // namespace diff --git a/tensorstore/internal/tracing/BUILD b/tensorstore/internal/tracing/BUILD index d85bc5915..ed6b4c5bf 100644 --- a/tensorstore/internal/tracing/BUILD +++ b/tensorstore/internal/tracing/BUILD @@ -1,10 +1,47 @@ -load("//bazel:tensorstore.bzl", "tensorstore_cc_library") +load("//bazel:tensorstore.bzl", "tensorstore_cc_library", "tensorstore_cc_test") package(default_visibility = ["//tensorstore:internal_packages"]) licenses(["notice"]) +tensorstore_cc_library( + name = "span_attribute", + hdrs = [ + "span_attribute.h", + ], +) + tensorstore_cc_library( name = "tracing", - hdrs = ["tracing.h"], + srcs = [ + "logged_trace_span.cc", + "trace_span.cc", + ], + hdrs = [ + "logged_trace_span.h", + "trace_context.h", + "trace_span.h", + ], + deps = [ + ":span_attribute", + "//tensorstore/internal:source_location", + "//tensorstore/util:span", + "@com_google_absl//absl/log:log_streamer", + "@com_google_absl//absl/status", + "@com_google_absl//absl/strings:str_format", + "@com_google_absl//absl/time", + ], +) + +tensorstore_cc_test( + name = "trace_test", + srcs = ["trace_test.cc"], + deps = [ + ":span_attribute", + ":tracing", + "@com_google_absl//absl/base:log_severity", + "@com_google_absl//absl/log:scoped_mock_log", + "@com_google_absl//absl/status", + "@com_google_googletest//:gtest_main", + ], ) diff --git a/tensorstore/internal/tracing/logged_trace_span.cc b/tensorstore/internal/tracing/logged_trace_span.cc new file mode 100644 index 000000000..46e5e6997 --- /dev/null +++ b/tensorstore/internal/tracing/logged_trace_span.cc @@ -0,0 +1,45 @@ +// Copyright 2024 The TensorStore Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "tensorstore/internal/tracing/logged_trace_span.h" + +#include +#include + +#include "absl/strings/str_format.h" + +namespace tensorstore { +namespace internal_tracing { + +void LoggedTraceSpan::BeginLog(std::ostream& stream) { + stream << absl::StreamFormat("%x: Start %s", id_, method()); +} + +std::ostream& LoggedTraceSpan::EndLog(std::ostream& stream) { + stream << absl::StreamFormat("%x: End %s", id_, method()); + return stream; +} + +void LoggedTraceSpan::LogImpl(std::string_view name, const void* val, + std::ostream& stream) { + stream << absl::StreamFormat("%x: %s=%p", id_, name, val); +} + +void LoggedTraceSpan::LogImpl(std::string_view name, const char* val, + std::ostream& stream) { + stream << absl::StreamFormat("%x: %s=%s", id_, name, val); +} + +} // namespace internal_tracing +} // namespace tensorstore diff --git a/tensorstore/internal/tracing/logged_trace_span.h b/tensorstore/internal/tracing/logged_trace_span.h new file mode 100644 index 000000000..5b01d6133 --- /dev/null +++ b/tensorstore/internal/tracing/logged_trace_span.h @@ -0,0 +1,147 @@ +// Copyright 2024 The TensorStore Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef TENSORSTORE_INTERNAL_TRACING_LOGGED_TRACE_SPAN_H_ +#define TENSORSTORE_INTERNAL_TRACING_LOGGED_TRACE_SPAN_H_ + +#include + +#include +#include +#include +#include +#include + +#include "absl/log/log_streamer.h" +#include "absl/status/status.h" +#include "absl/strings/str_format.h" +#include "absl/time/clock.h" +#include "absl/time/time.h" +#include "tensorstore/internal/source_location.h" +#include "tensorstore/internal/tracing/span_attribute.h" +#include "tensorstore/internal/tracing/trace_span.h" +#include "tensorstore/util/span.h" + +namespace tensorstore { +namespace internal_tracing { + +/// A TraceSpan which optionally includes scoped logging to ABSL INFO. +class LoggedTraceSpan : public TraceSpan { + // Generates a random ID for logging the Begin/End log messages. + inline uint64_t random_id() { + thread_local uint64_t id = absl::ToUnixNanos(absl::Now()); + // Apply xorshift64, which has a period of 2^64-1, to the per-thread id + // to generate the next id. + uint64_t x = id; + do { + x ^= x << 13; + x ^= x >> 7; + x ^= x << 17; + } while (x == 0); + return id = x; + } + + public: + LoggedTraceSpan(std::string_view method, bool log, + const SourceLocation& location = SourceLocation::current()) + : TraceSpan(method, location), + location_(location), + id_(log ? random_id() : 0) { + if (id_) + BeginLog(absl::LogInfoStreamer(location_.file_name(), location_.line()) + .stream()); + } + + LoggedTraceSpan(std::string_view method, bool log, + tensorstore::span attributes, + const SourceLocation& location = SourceLocation::current()) + : TraceSpan(method, attributes, location), + location_(location), + id_(log ? random_id() : 0) { + if (id_) + BeginLog(absl::LogInfoStreamer(location_.file_name(), location_.line()) + .stream(), + attributes); + } + + LoggedTraceSpan(std::string_view method, bool log, + std::initializer_list attributes, + const SourceLocation& location = SourceLocation::current()) + : LoggedTraceSpan(method, log, + tensorstore::span( + attributes.begin(), attributes.end()), + location) {} + + ~LoggedTraceSpan() { + if (id_) + EndLog(absl::LogInfoStreamer(location_.file_name(), location_.line()) + .stream()); + } + + /// Log an key=value pair with the current LoggedTraceSpan Id to the INFO log. + template + void Log(std::string_view name, T val, + const SourceLocation& location = SourceLocation::current()) { + if (id_) + LogImpl(name, val, + absl::LogInfoStreamer(location.file_name(), location.line()) + .stream()); + } + + /// Finish the span with a logged status. + absl::Status EndWithStatus( + absl::Status&& status, + const SourceLocation& location = SourceLocation::current()) && { + if (id_) { + EndLog( + absl::LogInfoStreamer(location.file_name(), location.line()).stream()) + << status; + id_ = 0; + } + return status; + } + + using TraceSpan::method; + + private: + void BeginLog(std::ostream& stream); + + void BeginLog(std::ostream& stream, + tensorstore::span attributes) { + BeginLog(stream); + for (const auto& attr : attributes) { + stream << absl::StreamFormat(", %s=", attr.name); + std::visit([&stream](auto v) { stream << v; }, attr.value); + } + } + + std::ostream& EndLog(std::ostream& stream); + + void LogImpl(std::string_view name, const void* val, std::ostream& stream); + void LogImpl(std::string_view name, const char* val, std::ostream& stream); + + template + std::enable_if_t, void> // + LogImpl(std::string_view name, T val, std::ostream& stream) { + stream << absl::StreamFormat("%x: %s=", id_, name) << val; + } + + SourceLocation location_; + uint64_t id_ = 0; +}; + +} // namespace internal_tracing +} // namespace tensorstore + +#endif // TENSORSTORE_INTERNAL_TRACING_LOGGED_TRACE_SPAN_H_ diff --git a/tensorstore/internal/tracing/span_attribute.h b/tensorstore/internal/tracing/span_attribute.h new file mode 100644 index 000000000..6ce4b0336 --- /dev/null +++ b/tensorstore/internal/tracing/span_attribute.h @@ -0,0 +1,64 @@ +// Copyright 2024 The TensorStore Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef TENSORSTORE_INTERNAL_TRACING_SPAN_ATTRIBUTE_H_ +#define TENSORSTORE_INTERNAL_TRACING_SPAN_ATTRIBUTE_H_ + +#include + +#include +#include + +namespace tensorstore { +namespace internal_tracing { + +/// Spans can have additional attributes added to them. +struct SpanAttribute { + std::string_view name; + std::variant value; + + SpanAttribute(std::string_view name, bool value) : name(name), value(value) {} + + SpanAttribute(std::string_view name, int value) + : name(name), value(static_cast(value)) {} + SpanAttribute(std::string_view name, long value) // NOLINT + : name(name), value(static_cast(value)) {} + SpanAttribute(std::string_view name, long long value) // NOLINT + : name(name), value(static_cast(value)) {} + + SpanAttribute(std::string_view name, unsigned int value) + : name(name), value(static_cast(value)) {} + SpanAttribute(std::string_view name, unsigned long value) // NOLINT + : name(name), value(static_cast(value)) {} + SpanAttribute(std::string_view name, unsigned long long value) // NOLINT + : name(name), value(static_cast(value)) {} + + SpanAttribute(std::string_view name, float value) + : name(name), value(static_cast(value)) {} + SpanAttribute(std::string_view name, double value) + : name(name), value(value) {} + + SpanAttribute(std::string_view name, std::string_view value) + : name(name), value(value) {} + SpanAttribute(std::string_view name, const char* value) + : name(name), value(std::string_view(value)) {} + + SpanAttribute(std::string_view name, void* value) + : name(name), value(value) {} +}; + +} // namespace internal_tracing +} // namespace tensorstore + +#endif // TENSORSTORE_INTERNAL_TRACING_SPAN_ATTRIBUTE_H_ diff --git a/tensorstore/internal/tracing/tracing.h b/tensorstore/internal/tracing/trace_context.h similarity index 84% rename from tensorstore/internal/tracing/tracing.h rename to tensorstore/internal/tracing/trace_context.h index 3eec59948..2f2d22eab 100644 --- a/tensorstore/internal/tracing/tracing.h +++ b/tensorstore/internal/tracing/trace_context.h @@ -23,8 +23,14 @@ namespace internal_tracing { struct TraceContext { struct ThreadInitType {}; inline static constexpr ThreadInitType kThread{}; - explicit TraceContext(ThreadInitType) {} + TraceContext() = delete; + explicit TraceContext(ThreadInitType) {} + + TraceContext(TraceContext&&) = default; + TraceContext& operator=(TraceContext&&) = default; + TraceContext(const TraceContext&) = default; + TraceContext& operator=(const TraceContext&) = default; }; inline void SwapCurrentTraceContext(TraceContext* context) {} diff --git a/tensorstore/internal/tracing/trace_span.cc b/tensorstore/internal/tracing/trace_span.cc new file mode 100644 index 000000000..8bb26e686 --- /dev/null +++ b/tensorstore/internal/tracing/trace_span.cc @@ -0,0 +1,15 @@ +// Copyright 2024 The TensorStore Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "tensorstore/internal/tracing/trace_span.h" diff --git a/tensorstore/internal/tracing/trace_span.h b/tensorstore/internal/tracing/trace_span.h new file mode 100644 index 000000000..97a87f75d --- /dev/null +++ b/tensorstore/internal/tracing/trace_span.h @@ -0,0 +1,57 @@ +// Copyright 2024 The TensorStore Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef TENSORSTORE_INTERNAL_TRACING_SPAN_H_ +#define TENSORSTORE_INTERNAL_TRACING_SPAN_H_ + +#include + +#include +#include + +#include "tensorstore/internal/source_location.h" +#include "tensorstore/internal/tracing/span_attribute.h" +#include "tensorstore/util/span.h" + +namespace tensorstore { +namespace internal_tracing { + +class TraceSpan { + public: + TraceSpan(std::string_view method, + const SourceLocation& location = SourceLocation::current()) + : method_(method) {} + + TraceSpan(std::string_view method, + tensorstore::span attributes, + const SourceLocation& location = SourceLocation::current()) + : TraceSpan(method, location) {} + + TraceSpan(std::string_view method, + std::initializer_list attributes, + const SourceLocation& location = SourceLocation::current()) + : TraceSpan(method, location) {} + + ~TraceSpan() = default; + + std::string_view method() const { return method_; } + + private: + std::string_view method_; +}; + +} // namespace internal_tracing +} // namespace tensorstore + +#endif // TENSORSTORE_INTERNAL_TRACING_SPAN_H_ diff --git a/tensorstore/internal/tracing/trace_test.cc b/tensorstore/internal/tracing/trace_test.cc new file mode 100644 index 000000000..ac75b6e92 --- /dev/null +++ b/tensorstore/internal/tracing/trace_test.cc @@ -0,0 +1,86 @@ +// Copyright 2024 The TensorStore Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include + +#include +#include +#include "absl/base/log_severity.h" +#include "absl/log/scoped_mock_log.h" +#include "absl/status/status.h" +#include "tensorstore/internal/tracing/logged_trace_span.h" +#include "tensorstore/internal/tracing/span_attribute.h" +#include "tensorstore/internal/tracing/trace_span.h" + +namespace { + +using ::tensorstore::internal_tracing::LoggedTraceSpan; +using ::tensorstore::internal_tracing::SpanAttribute; +using ::tensorstore::internal_tracing::TraceSpan; +using ::testing::_; +using ::testing::HasSubstr; + +TEST(TraceTest, Span) { + TraceSpan span("TraceSpan", + { + SpanAttribute{"int", 1}, + SpanAttribute{"string", "hello"}, + SpanAttribute{"uint", 1ull}, + SpanAttribute{"bool", true}, + SpanAttribute{"double", 1.0}, + SpanAttribute{"string_view", std::string_view("hello")}, + SpanAttribute{"void*", (void*)0}, + }); + + EXPECT_EQ(span.method(), "TraceSpan"); +} + +TEST(TraceTest, LoggedSpan) { + absl::ScopedMockLog log(absl::MockLogDefault::kDisallowUnexpected); + + EXPECT_CALL( + log, Log(absl::LogSeverity::kInfo, _, + HasSubstr("Start LoggedTraceSpan, int=1, string=hello, uint=1, " + "bool=true, double=1, string_view=hello, void*="))); + + EXPECT_CALL(log, Log(absl::LogSeverity::kInfo, _, HasSubstr("hello=world"))); + + EXPECT_CALL( + log, Log(absl::LogSeverity::kInfo, _, HasSubstr("End LoggedTraceSpan"))); + + log.StartCapturingLogs(); + + { + LoggedTraceSpan span("LoggedTraceSpan", true, + { + {"int", 1}, + {"string", "hello"}, + {"uint", 1ull}, + {"bool", true}, + {"double", 1.0}, + {"string_view", std::string_view("hello")}, + {"void*", (void*)0}, + }); + + EXPECT_EQ(span.method(), "LoggedTraceSpan"); + span.Log("hello", "world"); + + std::move(span).EndWithStatus(absl::OkStatus()).IgnoreError(); + } +} + +} // namespace diff --git a/tensorstore/util/BUILD b/tensorstore/util/BUILD index d628ee66c..a63e3568b 100644 --- a/tensorstore/util/BUILD +++ b/tensorstore/util/BUILD @@ -267,8 +267,8 @@ tensorstore_cc_library( name = "executor", hdrs = ["executor.h"], deps = [ - "//tensorstore/internal:type_traits", "//tensorstore/internal/poly", + "//tensorstore/internal/tracing", "@com_google_absl//absl/base:core_headers", "@com_google_absl//absl/functional:any_invocable", "@com_google_absl//absl/meta:type_traits", diff --git a/tensorstore/util/executor.h b/tensorstore/util/executor.h index dc775b404..2631e4d76 100644 --- a/tensorstore/util/executor.h +++ b/tensorstore/util/executor.h @@ -26,7 +26,7 @@ #include "absl/functional/any_invocable.h" #include "absl/meta/type_traits.h" #include "tensorstore/internal/poly/poly.h" -#include "tensorstore/internal/type_traits.h" +#include "tensorstore/internal/tracing/trace_context.h" namespace tensorstore { @@ -79,12 +79,16 @@ class ExecutorBoundFunction { template std::enable_if_t> // operator()(T&&... arg) { + internal_tracing::SwapCurrentTraceContext(&tc_); executor(std::bind(std::move(function), std::forward(arg)...)); + internal_tracing::SwapCurrentTraceContext(&tc_); } template std::enable_if_t> operator()( T&&... arg) const { + internal_tracing::SwapCurrentTraceContext(&tc_); executor(std::bind(function, std::forward(arg)...)); + internal_tracing::SwapCurrentTraceContext(&tc_); } /// Executor object. @@ -92,6 +96,9 @@ class ExecutorBoundFunction { /// Function object. ABSL_ATTRIBUTE_NO_UNIQUE_ADDRESS Function function; + + /// Trace context. + ABSL_ATTRIBUTE_NO_UNIQUE_ADDRESS mutable internal_tracing::TraceContext tc_; }; /// Returns an instance of `ExecutorBoundFunction` that invokes the given @@ -106,7 +113,9 @@ std::enable_if_t< ExecutorBoundFunction, absl::remove_cvref_t>> WithExecutor(Executor&& executor, Function&& function) { - return {std::forward(executor), std::forward(function)}; + return { + std::forward(executor), std::forward(function), + internal_tracing::TraceContext(internal_tracing::TraceContext::kThread)}; } template std::enable_if_t, InlineExecutor>, diff --git a/tensorstore/util/future_impl.h b/tensorstore/util/future_impl.h index be4734e0a..ee9748564 100644 --- a/tensorstore/util/future_impl.h +++ b/tensorstore/util/future_impl.h @@ -36,7 +36,7 @@ #include "tensorstore/internal/integer_sequence.h" #include "tensorstore/internal/intrusive_ptr.h" #include "tensorstore/internal/tagged_ptr.h" -#include "tensorstore/internal/tracing/tracing.h" +#include "tensorstore/internal/tracing/trace_context.h" #include "tensorstore/internal/type_traits.h" #include "tensorstore/util/result.h" @@ -528,10 +528,16 @@ class CallbackBase : public CallbackListNode { /// kForceCallback and kResultNotNeededCallback. kLinkCallback = 3; - explicit CallbackBase(SharedStatePointer shared_state) + CallbackBase(SharedStatePointer shared_state, + internal_tracing::TraceContext trace_context) : shared_state_(shared_state), reference_count_(2), - trace_context_(internal_tracing::TraceContext::kThread) {} + trace_context_(std::move(trace_context)) {} + + explicit CallbackBase(SharedStatePointer shared_state) + : CallbackBase(std::move(shared_state), + internal_tracing::TraceContext( + internal_tracing::TraceContext::kThread)) {} virtual ~CallbackBase(); diff --git a/third_party/com_google_absl/workspace.bzl b/third_party/com_google_absl/workspace.bzl index 6890676cd..9fce61293 100644 --- a/third_party/com_google_absl/workspace.bzl +++ b/third_party/com_google_absl/workspace.bzl @@ -81,7 +81,9 @@ ABSL_CMAKE_MAPPING = { "//absl/log:check": "absl::check", "//absl/log:die_if_null": "absl::die_if_null", "//absl/log:log_sink": "absl::log_sink", + "//absl/log:log_streamer": "absl::log_streamer", "//absl/log:log": "absl::log", + "//absl/log:scoped_mock_log": "absl::scoped_mock_log", "//absl/memory:memory": "absl::memory", "//absl/meta:meta": "absl::meta", "//absl/meta:type_traits": "absl::type_traits",