Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is it possible to do SZ_Finalize on a task based manner? Or provide a thread safe API? #86

Open
JasonRuonanWang opened this issue Mar 25, 2022 · 15 comments

Comments

@JasonRuonanWang
Copy link

We are currently running into segfaults when compressing and decompressing in multiple threads that call SZ API. When a compression/decompression task in one thread finishes, if we call SZ_Finalize, then it will try to release all memory buffers that SZ allocates, including those allocated in other threads, which will then cause a segfault. If we don't call SZ_Finalize, then there is going to be a memory leak. Any suggestions on handling this kind of multi-thread workflows? Thanks.

@robertu94
Copy link
Collaborator

SZ_Finalize is supposed to be called at program startup and shutdown. In LibPressio we provide two solutions to this. The first is an external to SZ, but internal to LibPressio atomic reference counting solution with reader writer locks to ensure the integrity of the global metadata. It also detects if a users has called Init without us and won’t deallocate if the reference count hits zero, unfortunately it can’t prevent the user from calling SZ_Finalize on their own. The second was in very new versions of SZ2, a thread safe API was added that supports a small subset of functionality. We wrap the latter as sz_threadsafe. I suggest referring to the compressor plugins for LibPressio if feel like you must implement this version checking and complexity yourself. You can find the threadsafe API here https://github.com/robertu94/libpressio/blob/master/src/plugins/compressors/sz_threadsafe_plugin.cc

SZ3 removes the equivalent of SZ_Init and SZ_Finalize and is threadsafe from the start. It also implements an non overlapping subset of SZ2 and is a breaking change of the API as suggested by the major version change. It will eventually support decompressing data compressed with SZ2, and that functionality will come in a minor release.

@robertu94
Copy link
Collaborator

One last comment is that WITHOUT the threadsafe API, it is undefined behavior to use any mode except ABS concurrently, and this can cause crashes at best or a silent error at worst. The ABS error bound can only safely be used from multiple threads provided that only the SAME error bound is passed to all threads. It also is undefined behavior to mix error bounds concurrently with the NONTHREADSAFE api.

The SAFE API alleviates these problems, but is a proper subset.

@JasonRuonanWang
Copy link
Author

SZ_Finalize is supposed to be called at program startup and shutdown.

This is what I thought initially, but without calling SZ_Finalize, there is a memory leak, causing the memory consumption to go all the way up. We have workflows that need to compress and decompress very large data for a large number of time steps. With this memory leak, there is no way to make any of these intensive workflows happen.

I am wondering where I can get the SZ3 code? It sounds like that's an ultimate solution.

@robertu94
Copy link
Collaborator

robertu94 commented Mar 25, 2022

there is a memory leak, causing the memory consumption to go all the way up.

This is a bug, and we should probably fix it if practical. It probably won't happen before the SC deadline. Can you send us an example of the databuffer that triggers the bug, and the metadata (size, type, etc...) so we can repoduce?

I am wondering where I can get the SZ3 code? It sounds like that's an ultimate solution.

Here is where you can get the SZ3 code. You can find a usage example of using it here: https://github.com/robertu94/libpressio/blob/master/src/plugins/compressors/sz3.cc . Alternatively, I would like to work with you and your team to make the LibPressio integration easier to use. If we can get someone from your team to offer to maintain MGARD in spack, I would like to submit a PR to spack to add it, which then it could be used from ADIOS when built by spack.

@JasonRuonanWang
Copy link
Author

This is a bug, and we should probably fix it if practical. It probably won't happen before the SC deadline. Can you send us an example of the databuffer that triggers the bug, and the metadata (size, type, etc...) so we can repoduce?

It doesn't need a specific code to reproduce. You can simply take any SZ test or hello world code without calling SZ_Finalize, put all the compress / decompress operations in a while(true) loop and watch the OS memory monitor.

@disheng222
Copy link
Collaborator

Hi Jason,
I tested the compression using a loop, but I didn't find the memory leakage issue.
The testing code was generated by changing the testfloat_compress.c as follows:

........
for(int i = 0;i<10;i++)
{
cost_start();
unsigned char *bytes = SZ_compress(SZ_FLOAT, data, &outSize, r5, r4, r3, r2, r1);
cost_end();
printf("timecost=%f\n",totalCost);
writeByteData(bytes, outSize, outputFilePath, &status);
if(status != SZ_SCES)
{
printf("Error: data file %s cannot be written!\n", outputFilePath);
exit(0);
}
free(bytes);
}
printf("done\n");
free(data);
SZ_Finalize();

FYI, I am using the latest version (master branch).
I am using valgrind to check if there are mem leakage issue, as shown below:

[sdi@localhost example]$ valgrind --leak-check=yes ./testfloat_compress sz.config ./testdata/x86/testfloat_8_8_128.dat 8 8 128
==300870== Memcheck, a memory error detector
==300870== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==300870== Using Valgrind-3.16.1 and LibVEX; rerun with -h for copyright info
==300870== Command: ./testfloat_compress sz.config ./testdata/x86/testfloat_8_8_128.dat 8 8 128
==300870==
cfgFile=sz.config
timecost=0.246747
timecost=0.131899
timecost=0.133552
timecost=0.132713
timecost=0.130486
timecost=0.134804
timecost=0.132354
timecost=0.130603
timecost=0.132210
timecost=0.132958
done
==300870==
==300870== HEAP SUMMARY:
==300870== in use at exit: 0 bytes in 0 blocks
==300870== total heap usage: 1,568 allocs, 1,568 frees, 979,064,155 bytes allocated
==300870==
==300870== All heap blocks were freed -- no leaks are possible
==300870==
==300870== For lists of detected and suppressed errors, rerun with: -s
==300870== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
I attached the testing code (testfloat_compress2.c) and the sz.config if you want to reproduce my result.

Best,
Sheng

@JasonRuonanWang
Copy link
Author

@disheng222 Can you try the following code? Let it run for 10 minutes and watch a operating system resource monitor. Then uncomment the two SZ_Finalize() calls and watch it again. Thanks.

#include <vector>
#include <cstring>
#include <functional>
#include <iostream>
#include <numeric>
#include <algorithm>
#include <map>
#include <string>
#include <math.h>
extern "C" {
#include <sz/sz.h>
}

using Dims=std::vector<size_t>;
using Params=std::map<std::string,std::string>;

int N=100;

size_t Compress(const char *dataIn, char *bufferOut, const Params &parameters)
{
    const uint8_t bufferVersion = 1;
    size_t bufferOutOffset = 0;

    const size_t ndims = 2;

    sz_params sz;
    memset(&sz, 0, sizeof(sz_params));
    sz.max_quant_intervals = 65536;
    sz.quantization_intervals = 0;
    sz.sol_ID = SZ;
    sz.sampleDistance = 100;
    sz.predThreshold = 0.99;
    sz.szMode = SZ_BEST_COMPRESSION; // SZ_BEST_SPEED; //SZ_BEST_COMPRESSION;
    sz.gzipMode = 1;
    sz.errorBoundMode = ABS;
    sz.absErrBound = 1E-4;
    sz.relBoundRatio = 1E-3;
    sz.psnr = 80.0;
    sz.pw_relBoundRatio = 1E-5;
    sz.segment_size = static_cast<int>(std::pow(5., static_cast<double>(ndims)));
    sz.pwr_type = SZ_PWR_MIN_TYPE;


    /* SZ parameters */
    int use_configfile = 0;
    std::string sz_configfile = "sz.config";

    Params::const_iterator it;
    for (it = parameters.begin(); it != parameters.end(); it++)
    {
        if (it->first == "init")
        {
            use_configfile = 1;
            sz_configfile = std::string(it->second);
        }
        else if (it->first == "max_quant_intervals")
        {
            sz.max_quant_intervals = std::stoi(it->second);
        }
        else if (it->first == "quantization_intervals")
        {
            sz.quantization_intervals = std::stoi(it->second);
        }
        else if (it->first == "sol_ID")
        {
            sz.sol_ID = std::stoi(it->second);
        }
        else if (it->first == "sampleDistance")
        {
            sz.sampleDistance = std::stoi(it->second);
        }
        else if (it->first == "predThreshold")
        {
            sz.predThreshold = std::stof(it->second);
        }
        else if (it->first == "szMode")
        {
            int szMode = SZ_BEST_SPEED;
            if (it->second == "SZ_BEST_SPEED")
            {
                szMode = SZ_BEST_SPEED;
            }
            else if (it->second == "SZ_BEST_COMPRESSION")
            {
                szMode = SZ_BEST_COMPRESSION;
            }
            else if (it->second == "SZ_DEFAULT_COMPRESSION")
            {
                szMode = SZ_DEFAULT_COMPRESSION;
            }
            else
            {
                throw std::invalid_argument(
                    "ERROR: ADIOS2 operator unknown SZ parameter szMode: " +
                    it->second + "\n");
            }
            sz.szMode = szMode;
        }
        else if (it->first == "gzipMode")
        {
            sz.gzipMode = std::stoi(it->second);
        }
        else if (it->first == "errorBoundMode")
        {
            int errorBoundMode = ABS;
            if (it->second == "ABS")
            {
                errorBoundMode = ABS;
            }
            else if (it->second == "REL")
            {
                errorBoundMode = REL;
            }
            else if (it->second == "ABS_AND_REL")
            {
                errorBoundMode = ABS_AND_REL;
            }
            else if (it->second == "ABS_OR_REL")
            {
                errorBoundMode = ABS_OR_REL;
            }
            else if (it->second == "PW_REL")
            {
                errorBoundMode = PW_REL;
            }
            else
            {
                throw std::invalid_argument("ERROR: ADIOS2 operator "
                                            "unknown SZ parameter "
                                            "errorBoundMode: " +
                                            it->second + "\n");
            }
            sz.errorBoundMode = errorBoundMode;
        }
        else if (it->first == "absErrBound")
        {
            sz.absErrBound = std::stof(it->second);
        }
        else if (it->first == "relBoundRatio")
        {
            sz.relBoundRatio = std::stof(it->second);
        }
        else if (it->first == "pw_relBoundRatio")
        {
            sz.pw_relBoundRatio = std::stof(it->second);
        }
        else if (it->first == "segment_size")
        {
            sz.segment_size = std::stoi(it->second);
        }
        else if (it->first == "pwr_type")
        {
            int pwr_type = SZ_PWR_MIN_TYPE;
            if ((it->first == "MIN") || (it->first == "SZ_PWR_MIN_TYPE"))
            {
                pwr_type = SZ_PWR_MIN_TYPE;
            }
            else if ((it->first == "AVG") || (it->first == "SZ_PWR_AVG_TYPE"))
            {
                pwr_type = SZ_PWR_AVG_TYPE;
            }
            else if ((it->first == "MAX") || (it->first == "SZ_PWR_MAX_TYPE"))
            {
                pwr_type = SZ_PWR_MAX_TYPE;
            }
            else
            {
                throw std::invalid_argument("ERROR: ADIOS2 operator "
                                            "unknown SZ parameter "
                                            "pwr_type: " +
                                            it->second + "\n");
            }
            sz.pwr_type = pwr_type;
        }
        else if ((it->first == "abs") || (it->first == "absolute") ||
                 (it->first == "accuracy"))
        {
            sz.errorBoundMode = ABS;
            sz.absErrBound = std::stod(it->second);
        }
        else if ((it->first == "rel") || (it->first == "relative"))
        {
            sz.errorBoundMode = REL;
            sz.relBoundRatio = std::stof(it->second);
        }
        else if ((it->first == "pw") || (it->first == "pwr") ||
                 (it->first == "pwrel") || (it->first == "pwrelative"))
        {
            sz.errorBoundMode = PW_REL;
            sz.pw_relBoundRatio = std::stof(it->second);
        }
    }

    if (use_configfile)
    {
        SZ_Init(sz_configfile.c_str());
    }
    else
    {
        SZ_Init_Params(&sz);
    }

    // Get type info
    int dtype = SZ_FLOAT;

    size_t szBufferSize;
    auto *szBuffer = SZ_compress( dtype, const_cast<char *>(dataIn), &szBufferSize, 0, 0, 0, 0, N);
    std::memcpy(bufferOut + bufferOutOffset, szBuffer, szBufferSize);
    bufferOutOffset += szBufferSize;
    free(szBuffer);
    szBuffer = nullptr;
//SZ_Finalize();
    return bufferOutOffset;
}

size_t Decompress(const char *bufferIn, const size_t sizeIn, char *dataOut)
{
    size_t bufferInOffset = 0;

    int dtype = SZ_FLOAT;
    size_t dataTypeSize=4;

    const size_t dataSizeBytes = N*sizeof(float);

    void *result = SZ_decompress(dtype, reinterpret_cast<unsigned char *>( const_cast<char *>(bufferIn + bufferInOffset)), sizeIn - bufferInOffset, 0, 0, 0, 0, N);

    if (result == nullptr)
    {
        throw std::runtime_error("ERROR: SZ_decompress failed\n");
    }
    std::memcpy(dataOut, result, dataSizeBytes);
    free(result);
    result = nullptr;
//SZ_Finalize();
    return dataSizeBytes;
}

int main()
{

    std::vector<float> dataOriginal(N);
    std::vector<float> dataCompressed(N);
    std::vector<float> dataDecompressed(N);

    for(int i=0;i<N;++i)
    {
        dataOriginal[i]=(float)i*0.01;
    }

    for(int i=0;i<1000000000;++i)
    {
        size_t size = Compress(reinterpret_cast<char*>(dataOriginal.data()), reinterpret_cast<char*>(dataCompressed.data()), {{"accuracy","0.01"}});
        Decompress(reinterpret_cast<char*>(dataCompressed.data()), size, reinterpret_cast<char*>(dataDecompressed.data()));
    }

    for(int i=0;i<100;++i)
    {
        for(int j=0;j<10;++j)
        {
            std::cout << dataOriginal[i*10+j] << " : " << dataDecompressed[i*10+j] << ",  ";
        }
        std::cout << std::endl;
    }

    return 0;
}

@robertu94
Copy link
Collaborator

robertu94 commented Mar 25, 2022

@JasonRuonanWang It is expected that the code you sent will leak. It calls SZ_Init multiple times without calling SZ_Finalize. In the non-threadsafe version of SZ you can access the parameters of the compressor via the confparams_cpr global variable. This is how you reconfigure the compressor without invoking SZ_Init using the non-threadsafe API.

@JasonRuonanWang
Copy link
Author

@JasonRuonanWang It is expected that the code you sent will leak. It calls SZ_Init multiple times without calling SZ_Finalize. In the non-threadsafe version of SZ you can access the parameters of the compressor via the confparams_cpr global variable. This is how you reconfigure the compressor without invoking SZ_Init using the non-threadsafe API.

Okay I see. In this case, what if I modify the parameters in one thread while another thread is doing compression or decompression?

@robertu94
Copy link
Collaborator

It will do evil things to you 😈 😢 . This isn't defined behavior in SZ2 without the threadsafe API. If you are lucky it will segfault.

@JasonRuonanWang
Copy link
Author

Right. Then it seems like SZ3 is the only way to go. Is SZ3 backward compatible with SZ1 compressed data?

@robertu94
Copy link
Collaborator

@disheng222 ☝🏻 I believe this is what Franck wants? I know that SZ2 backwards compatibility for decompression is planned, I wasn't sure if this extended back to SZ1?

@JasonRuonanWang
Copy link
Author

For compression, I understand that SZ_Init and SZ_Finalize must be called in pairs. But if you look at my code above, for the decompression routine, I actually didn't call SZ_Init at all, and it still worked. In this case, should I call SZ_Finalize after decompression? If I still need to call SZ_Finalize, then there isn't any SZ_Init matching it, which makes the code look very weird.

Generally, in a library, Init() and Finalize() should always be called in pairs. If some functionality can work without calling Init(), then it should not require Finalize() either. But in case of SZ, if I determine that SZ_Finalize() should not be called because there was no SZ_init() called, then there is going to be a memory leak.

I guess this is what I was trying to ask initially.

@JasonRuonanWang
Copy link
Author

In our original implementation in ADIOS2, what we did was

  1. SZ_Init()

  2. SZ_compress()

  3. SZ_Finalize()

  4. SZ_decompress(), which worked after SZ_Finalize() but without another SZ_Init()

  5. ???? Should we call SZ_Finalize() here? If so then there is no SZ_Init() matching it. If not then there is a memory leak.

@disheng222
Copy link
Collaborator

disheng222 commented Oct 11, 2022 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants