Skip to content

Commit

Permalink
add split_trace to allow for easy splitting of large traces for paral…
Browse files Browse the repository at this point in the history
…lel processing by gufi_trace2index
  • Loading branch information
calccrypto committed Dec 11, 2023
1 parent 014c5e5 commit bd45edc
Show file tree
Hide file tree
Showing 2 changed files with 228 additions and 0 deletions.
5 changes: 5 additions & 0 deletions contrib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ add_executable(gendir gendir.c)
target_link_libraries(gendir ${COMMON_LIBRARIES})
add_dependencies(gendir GUFI)

# utility for spliting large traces into smaller files for parallel processing
add_executable(split_trace split_trace.c)
target_link_libraries(split_trace ${COMMON_LIBRARIES})
add_dependencies(split_trace GUFI)

# potentially useful C++ executables
if (CMAKE_CXX_COMPILER)
# a more complex index generator
Expand Down
223 changes: 223 additions & 0 deletions contrib/split_trace.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/*
This file is part of GUFI, which is part of MarFS, which is released
under the BSD license.
Copyright (c) 2017, Los Alamos National Security (LANS), LLC
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation and/or
other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its contributors
may be used to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
From Los Alamos National Security, LLC:
LA-CC-15-039
Copyright (c) 2017, Los Alamos National Security, LLC All rights reserved.
Copyright 2017. Los Alamos National Security, LLC. This software was produced
under U.S. Government contract DE-AC52-06NA25396 for Los Alamos National
Laboratory (LANL), which is operated by Los Alamos National Security, LLC for
the U.S. Department of Energy. The U.S. Government has rights to use,
reproduce, and distribute this software. NEITHER THE GOVERNMENT NOR LOS
ALAMOS NATIONAL SECURITY, LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR
ASSUMES ANY LIABILITY FOR THE USE OF THIS SOFTWARE. If software is
modified to produce derivative works, such modified software should be
clearly marked, so as not to confuse it with the version available from
LANL.
THIS SOFTWARE IS PROVIDED BY LOS ALAMOS NATIONAL SECURITY, LLC AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL LOS ALAMOS NATIONAL SECURITY, LLC OR
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
OF SUCH DAMAGE.
*/



#include <stdlib.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <stdio.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>

#include "utils.h"
#include "QueuePerThreadPool.h"

const size_t GETLINE_DEFAULT_SIZE = 256; /* magic number */

struct Range {
int fd;
off_t start, end; /* [start, end) */
size_t id;
};

static int copy_range(QPTPool_t *ctx, const size_t id, void *data, void *args) {
(void) ctx;(void) id;

struct Range *range = (struct Range *) data;
const char *prefix = (char *) args;

char outname[MAXPATH];
SNPRINTF(outname, sizeof(outname), "%s.%zu", prefix, range->id);

int dst = open(outname, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (dst < 0) {
const int err = errno;
fprintf(stderr, "Error: Unable to open file %s to write range [%jd, %jd): %s (%d)\n",
outname, (intmax_t) range->start, (intmax_t) range->end, strerror(err), errno);
free(range);
return 1;
}

const int rc = (copyfd(range->fd, range->start, dst, 0, range->end - range->start) > -1);

close(dst);
free(range);

return rc;
}

int main(int argc, char *argv[]) {
if (argc < 5) {
fprintf(stderr, "Syntax: %s src_trace delim max_split_count dst_trace_prefix [threads]\n", argv[0]);
return 1;
}

const char *src = argv[1];
const char delim = argv[2][0];
size_t count = 0;
char *prefix = argv[4];
size_t threads = 1;

if (sscanf(argv[3], "%zu", &count) != 1) {
fprintf(stderr, "Error: Invalid output count: %s\n", argv[2]);
return 1;
}

if (argc > 5) {
if (sscanf(argv[5], "%zu", &threads) != 1) {
fprintf(stderr, "Error: Invalid thread count: %s\n", argv[5]);
return 1;
}
}

int rc = 0;

const int fd = open(src, O_RDONLY);
if (!fd) {
const int err = errno;
fprintf(stderr, "Error: Could not open source trace file %s: %s (%d)\n",
src, strerror(err), err);
rc = 1;
goto cleanup;
}

struct stat st;
if (fstat(fd, &st) != 0) {
const int err = errno;
fprintf(stderr, "Could not fstat trace file: %s (%d)\n",
strerror(err), err);
rc = 1;
goto cleanup;
}

QPTPool_t *pool = QPTPool_init(threads, prefix);
if (QPTPool_start(pool) != 0) {
fprintf(stderr, "Error: Failed to start thread pool\n");
rc = 1;
goto cleanup;
}

/* approximate number of bytes per chunk */

const size_t default_size = (st.st_size / count) + !(st.st_size % count);

size_t id = 0;
off_t offset = 0;
off_t end = offset + default_size;
while (end < st.st_size) {
const off_t start = end;

char *line = NULL;
size_t size = 0;
size_t len = 0;
if ((len = getline_fd(&line, &size, fd, &end, GETLINE_DEFAULT_SIZE)) < 1) {
break;
}

char *first_delim = memchr(line, delim, len);
if (!first_delim) {
free(line);
line = NULL;
size = 0;
len = 0;
fprintf(stderr, "Trace partitioning encountered bad line ending at offset %jd\n", (intmax_t) end);
break;
}

/* find first directory and stop */
if (first_delim[1] == 'd') {
struct Range *range = malloc(sizeof(*range));
range->fd = fd;
range->start = offset;
range->end = start;
range->id = id++;

QPTPool_enqueue(pool, id, copy_range, range);

offset = start;
end += default_size; /* jump to next start location */
}

free(line);
}

if (offset < st.st_size) {
struct Range *range = malloc(sizeof(*range));
range->fd = fd;
range->start = offset;
range->end = st.st_size;
range->id = id++;

QPTPool_enqueue(pool, id, copy_range, range);
}

QPTPool_wait(pool);
QPTPool_destroy(pool);

cleanup:
close(fd);

return rc;
}

0 comments on commit bd45edc

Please sign in to comment.