diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt index 5a5eeac35480..d52ad74b9912 100644 --- a/cpp/core/CMakeLists.txt +++ b/cpp/core/CMakeLists.txt @@ -120,7 +120,6 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS shuffle/RandomPartitioner.cc shuffle/RoundRobinPartitioner.cc shuffle/ShuffleMemoryPool.cc - shuffle/ShuffleReader.cc shuffle/ShuffleWriter.cc shuffle/SinglePartitioner.cc shuffle/Spill.cc diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index b4359bab4160..4e1d1f09fd1e 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -1132,7 +1132,6 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_ShuffleReaderJniWrapper jlong shuffleReaderHandle) { JNI_METHOD_START auto reader = ObjectStore::retrieve(shuffleReaderHandle); - GLUTEN_THROW_NOT_OK(reader->close()); ObjectStore::release(shuffleReaderHandle); JNI_METHOD_END() } diff --git a/cpp/core/shuffle/ShuffleReader.cc b/cpp/core/shuffle/ShuffleReader.cc deleted file mode 100644 index ced80b3de13f..000000000000 --- a/cpp/core/shuffle/ShuffleReader.cc +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "ShuffleReader.h" -#include "arrow/ipc/reader.h" -#include "arrow/record_batch.h" -#include "utils/Macros.h" - -#include - -#include "ShuffleSchema.h" - -namespace gluten { - -ShuffleReader::ShuffleReader(std::unique_ptr factory) : factory_(std::move(factory)) {} - -std::shared_ptr ShuffleReader::readStream(std::shared_ptr in) { - return std::make_shared(factory_->createDeserializer(in)); -} - -arrow::Status ShuffleReader::close() { - return arrow::Status::OK(); -} - -arrow::MemoryPool* ShuffleReader::getPool() const { - return factory_->getPool(); -} - -int64_t ShuffleReader::getDecompressTime() const { - return factory_->getDecompressTime(); -} - -ShuffleWriterType ShuffleReader::getShuffleWriterType() const { - return factory_->getShuffleWriterType(); -} - -int64_t ShuffleReader::getDeserializeTime() const { - return factory_->getDeserializeTime(); -} - -} // namespace gluten diff --git a/cpp/core/shuffle/ShuffleReader.h b/cpp/core/shuffle/ShuffleReader.h index 0f985c7da939..6e2b079fc73b 100644 --- a/cpp/core/shuffle/ShuffleReader.h +++ b/cpp/core/shuffle/ShuffleReader.h @@ -17,63 +17,22 @@ #pragma once -#include "memory/ColumnarBatch.h" - -#include -#include - -#include "Options.h" #include "compute/ResultIterator.h" -#include "utils/Compression.h" namespace gluten { -class DeserializerFactory { - public: - virtual ~DeserializerFactory() = default; - - virtual std::unique_ptr createDeserializer(std::shared_ptr in) = 0; - - virtual arrow::MemoryPool* getPool() = 0; - - virtual int64_t getDecompressTime() = 0; - - virtual int64_t getDeserializeTime() = 0; - - virtual ShuffleWriterType getShuffleWriterType() = 0; -}; - class ShuffleReader { public: - explicit ShuffleReader(std::unique_ptr factory); - virtual ~ShuffleReader() = default; // FIXME iterator should be unique_ptr or un-copyable singleton - virtual std::shared_ptr readStream(std::shared_ptr in); - - arrow::Status close(); - - int64_t getDecompressTime() const; - - int64_t getIpcTime() const; - - int64_t getDeserializeTime() const; - - arrow::MemoryPool* getPool() const; - - ShuffleWriterType getShuffleWriterType() const; + virtual std::shared_ptr readStream(std::shared_ptr in) = 0; - protected: - arrow::MemoryPool* pool_; - int64_t decompressTime_ = 0; - int64_t deserializeTime_ = 0; + virtual int64_t getDecompressTime() const = 0; - ShuffleWriterType shuffleWriterType_; + virtual int64_t getDeserializeTime() const = 0; - private: - std::shared_ptr schema_; - std::unique_ptr factory_; + virtual arrow::MemoryPool* getPool() const = 0; }; } // namespace gluten diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index 20c3dec939a0..2a2ea929c1ce 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -251,7 +251,7 @@ std::shared_ptr VeloxRuntime::createShuffleReader( auto codec = gluten::createArrowIpcCodec(options.compressionType, options.codecBackend); auto ctxVeloxPool = memoryManager()->getLeafMemoryPool(); auto veloxCompressionType = facebook::velox::common::stringToCompressionKind(options.compressionTypeStr); - auto deserializerFactory = std::make_unique( + auto deserializerFactory = std::make_unique( schema, std::move(codec), veloxCompressionType, diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc b/cpp/velox/shuffle/VeloxShuffleReader.cc index 0407be736a70..3aba7cf0fc3c 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.cc +++ b/cpp/velox/shuffle/VeloxShuffleReader.cc @@ -15,13 +15,13 @@ * limitations under the License. */ -#include "VeloxShuffleReader.h" -#include "GlutenByteStream.h" +#include "shuffle/VeloxShuffleReader.h" #include #include #include "memory/VeloxColumnarBatch.h" +#include "shuffle/GlutenByteStream.h" #include "shuffle/Payload.h" #include "shuffle/Utils.h" #include "utils/Common.h" @@ -576,7 +576,7 @@ std::shared_ptr VeloxRssSortShuffleReaderDeserializer::next() { return std::make_shared(std::move(rowVector)); } -VeloxColumnarBatchDeserializerFactory::VeloxColumnarBatchDeserializerFactory( +VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory( const std::shared_ptr& schema, const std::shared_ptr& codec, const facebook::velox::common::CompressionKind veloxCompressionType, @@ -598,7 +598,7 @@ VeloxColumnarBatchDeserializerFactory::VeloxColumnarBatchDeserializerFactory( initFromSchema(); } -std::unique_ptr VeloxColumnarBatchDeserializerFactory::createDeserializer( +std::unique_ptr VeloxShuffleReaderDeserializerFactory::createDeserializer( std::shared_ptr in) { switch (shuffleWriterType_) { case ShuffleWriterType::kHashShuffle: @@ -635,23 +635,19 @@ std::unique_ptr VeloxColumnarBatchDeserializerFactory::cr } } -arrow::MemoryPool* VeloxColumnarBatchDeserializerFactory::getPool() { +arrow::MemoryPool* VeloxShuffleReaderDeserializerFactory::getPool() { return memoryPool_; } -ShuffleWriterType VeloxColumnarBatchDeserializerFactory::getShuffleWriterType() { - return shuffleWriterType_; -} - -int64_t VeloxColumnarBatchDeserializerFactory::getDecompressTime() { +int64_t VeloxShuffleReaderDeserializerFactory::getDecompressTime() { return decompressTime_; } -int64_t VeloxColumnarBatchDeserializerFactory::getDeserializeTime() { +int64_t VeloxShuffleReaderDeserializerFactory::getDeserializeTime() { return deserializeTime_; } -void VeloxColumnarBatchDeserializerFactory::initFromSchema() { +void VeloxShuffleReaderDeserializerFactory::initFromSchema() { GLUTEN_ASSIGN_OR_THROW(auto arrowColumnTypes, toShuffleTypeId(schema_->fields())); isValidityBuffer_.reserve(arrowColumnTypes.size()); for (size_t i = 0; i < arrowColumnTypes.size(); ++i) { @@ -681,7 +677,23 @@ void VeloxColumnarBatchDeserializerFactory::initFromSchema() { } } -VeloxShuffleReader::VeloxShuffleReader(std::unique_ptr factory) - : ShuffleReader(std::move(factory)) {} +VeloxShuffleReader::VeloxShuffleReader(std::unique_ptr factory) + : factory_(std::move(factory)) {} + +std::shared_ptr VeloxShuffleReader::readStream(std::shared_ptr in) { + return std::make_shared(factory_->createDeserializer(in)); +} + +arrow::MemoryPool* VeloxShuffleReader::getPool() const { + return factory_->getPool(); +} + +int64_t VeloxShuffleReader::getDecompressTime() const { + return factory_->getDecompressTime(); +} + +int64_t VeloxShuffleReader::getDeserializeTime() const { + return factory_->getDeserializeTime(); +} } // namespace gluten diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h b/cpp/velox/shuffle/VeloxShuffleReader.h index af35f977127f..8ebdbf2bacab 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.h +++ b/cpp/velox/shuffle/VeloxShuffleReader.h @@ -17,16 +17,14 @@ #pragma once -#include "operators/serializer/VeloxColumnarBatchSerializer.h" #include "shuffle/Payload.h" #include "shuffle/ShuffleReader.h" #include "shuffle/VeloxSortShuffleWriter.h" -#include "utils/Timer.h" + +#include "velox/serializers/PrestoSerializer.h" #include "velox/type/Type.h" #include "velox/vector/ComplexVector.h" -#include - namespace gluten { class VeloxHashShuffleReaderDeserializer final : public ColumnarBatchIterator { @@ -134,9 +132,9 @@ class VeloxRssSortShuffleReaderDeserializer : public ColumnarBatchIterator { std::shared_ptr in_; }; -class VeloxColumnarBatchDeserializerFactory : public DeserializerFactory { +class VeloxShuffleReaderDeserializerFactory { public: - VeloxColumnarBatchDeserializerFactory( + VeloxShuffleReaderDeserializerFactory( const std::shared_ptr& schema, const std::shared_ptr& codec, const facebook::velox::common::CompressionKind veloxCompressionType, @@ -147,15 +145,13 @@ class VeloxColumnarBatchDeserializerFactory : public DeserializerFactory { std::shared_ptr veloxPool, ShuffleWriterType shuffleWriterType); - std::unique_ptr createDeserializer(std::shared_ptr in) override; - - arrow::MemoryPool* getPool() override; + std::unique_ptr createDeserializer(std::shared_ptr in); - int64_t getDecompressTime() override; + arrow::MemoryPool* getPool(); - int64_t getDeserializeTime() override; + int64_t getDecompressTime(); - ShuffleWriterType getShuffleWriterType() override; + int64_t getDeserializeTime(); private: void initFromSchema(); @@ -180,6 +176,17 @@ class VeloxColumnarBatchDeserializerFactory : public DeserializerFactory { class VeloxShuffleReader final : public ShuffleReader { public: - VeloxShuffleReader(std::unique_ptr factory); + VeloxShuffleReader(std::unique_ptr factory); + + std::shared_ptr readStream(std::shared_ptr in) override; + + int64_t getDecompressTime() const override; + + int64_t getDeserializeTime() const override; + + arrow::MemoryPool* getPool() const override; + + private: + std::unique_ptr factory_; }; } // namespace gluten diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h index 9331a7207858..4fcab8f24271 100644 --- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h +++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h @@ -360,7 +360,7 @@ class VeloxShuffleWriterTest : public ::testing::TestWithParam( + auto deserializerFactory = std::make_unique( schema, std::move(codec), veloxCompressionType,