Skip to content

Commit

Permalink
add split_trace to allow for easy splitting of large traces for paral…
Browse files Browse the repository at this point in the history
…lel processing by gufi_trace2index
  • Loading branch information
calccrypto committed Dec 12, 2023
1 parent 4ba5ba2 commit c0fd844
Show file tree
Hide file tree
Showing 6 changed files with 417 additions and 0 deletions.
5 changes: 5 additions & 0 deletions contrib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ add_executable(gendir gendir.c)
target_link_libraries(gendir ${COMMON_LIBRARIES})
add_dependencies(gendir GUFI)

# utility for spliting large traces into smaller files for parallel processing
add_executable(split_trace split_trace.c)
target_link_libraries(split_trace ${COMMON_LIBRARIES})
add_dependencies(split_trace GUFI)

# potentially useful C++ executables
if (CMAKE_CXX_COMPILER)
# a more complex index generator
Expand Down
249 changes: 249 additions & 0 deletions contrib/split_trace.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
/*
This file is part of GUFI, which is part of MarFS, which is released
under the BSD license.
Copyright (c) 2017, Los Alamos National Security (LANS), LLC
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation and/or
other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its contributors
may be used to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
From Los Alamos National Security, LLC:
LA-CC-15-039
Copyright (c) 2017, Los Alamos National Security, LLC All rights reserved.
Copyright 2017. Los Alamos National Security, LLC. This software was produced
under U.S. Government contract DE-AC52-06NA25396 for Los Alamos National
Laboratory (LANL), which is operated by Los Alamos National Security, LLC for
the U.S. Department of Energy. The U.S. Government has rights to use,
reproduce, and distribute this software. NEITHER THE GOVERNMENT NOR LOS
ALAMOS NATIONAL SECURITY, LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR
ASSUMES ANY LIABILITY FOR THE USE OF THIS SOFTWARE. If software is
modified to produce derivative works, such modified software should be
clearly marked, so as not to confuse it with the version available from
LANL.
THIS SOFTWARE IS PROVIDED BY LOS ALAMOS NATIONAL SECURITY, LLC AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL LOS ALAMOS NATIONAL SECURITY, LLC OR
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
OF SUCH DAMAGE.
*/



#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>

#include "QueuePerThreadPool.h"
#include "utils.h"

const size_t GETLINE_DEFAULT_SIZE = 256; /* magic number */

struct Range {
int fd;
off_t start, end; /* [start, end) */
size_t id;
};

static int copy_range(QPTPool_t *ctx, const size_t id, void *data, void *args) {
(void) ctx; (void) id;

struct Range *range = (struct Range *) data;
const char *prefix = (char *) args;

char outname[MAXPATH];
SNPRINTF(outname, sizeof(outname), "%s.%zu", prefix, range->id);

int dst = open(outname, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (dst < 0) {
const int err = errno;
fprintf(stderr, "Error: Unable to open file %s to write range [%jd, %jd): %s (%d)\n",
outname, (intmax_t) range->start, (intmax_t) range->end, strerror(err), errno);
free(range);
return 1;
}

const off_t size = range->end - range->start;

/* ignore errors */
ftruncate(dst, size);

const int rc = (copyfd(range->fd, range->start, dst, 0, size) > -1);

close(dst);
free(range);

return rc;
}

int main(int argc, char *argv[]) {
if (argc < 5) {
fprintf(stderr, "Syntax: %s src_trace delim max_split_count dst_trace_prefix [threads]\n", argv[0]);
return 1;
}

const char *src = argv[1];
const char delim = argv[2][0];
size_t count = 0;
char *prefix = argv[4];
size_t threads = 1;

if (sscanf(argv[3], "%zu", &count) != 1) {
fprintf(stderr, "Error: Invalid output count: %s\n", argv[2]);
return 1;
}

if (argc > 5) {
if (sscanf(argv[5], "%zu", &threads) != 1) {
fprintf(stderr, "Error: Invalid thread count: %s\n", argv[5]);
return 1;
}
}

int rc = 0;

const int fd = open(src, O_RDONLY);
if (fd < 0) {
const int err = errno;
fprintf(stderr, "Error: Could not open source trace file %s: %s (%d)\n",
src, strerror(err), err);
rc = 1;
goto cleanup;
}

struct stat st;
if (fstat(fd, &st) != 0) {
const int err = errno;
fprintf(stderr, "Could not fstat trace file: %s (%d)\n",
strerror(err), err);
rc = 1;
goto cleanup;
}

QPTPool_t *pool = QPTPool_init(threads, prefix);
if (QPTPool_start(pool) != 0) {
fprintf(stderr, "Error: Failed to start thread pool\n");
rc = 1;
goto cleanup;
}

/* approximate number of bytes per chunk */
const size_t default_size = (st.st_size / count) + !(st.st_size % count);

size_t id = 0;
off_t offset = 0;
off_t end = offset + default_size;
while (end < st.st_size) {
off_t start = 0;

char *line = NULL;
size_t size = 0;
ssize_t len = 0;

/*
* drop first line - assume started in middle of line, and
* can't be sure if first delimiter found is actually
* separator for first and second columns
*/
if ((len = getline_fd(&line, &size, fd, &end, GETLINE_DEFAULT_SIZE)) < 1) {
break;
}

line = NULL;
size = 0;
len = 0;

char *first_delim = NULL;
while (!first_delim || (first_delim[1] != 'd')) {
if ((len = getline_fd(&line, &size, fd, &end, GETLINE_DEFAULT_SIZE)) < 1) {
break;
}

first_delim = memchr(line, delim, len);
if (!first_delim) {
free(line);
line = NULL;
size = 0;
len = 0;
break;
}
}

if ((len < 1) || !first_delim) {
rc = (len < 0) || !first_delim;
goto wait;
}

/* if the current line is the start of a stanza, load work and skip to next area to search */
if (first_delim[1] == 'd') {
struct Range *range = malloc(sizeof(*range));
range->fd = fd;
range->start = offset;
range->end = start;
range->id = id++;

QPTPool_enqueue(pool, id, copy_range, range);

offset = start;
end += default_size; /* jump to next start location */
}

free(line);
}

if (offset < st.st_size) {
struct Range *range = malloc(sizeof(*range));
range->fd = fd;
range->start = offset;
range->end = st.st_size;
range->id = id++;

QPTPool_enqueue(pool, 0, copy_range, range);
}

wait:
QPTPool_wait(pool);
QPTPool_destroy(pool);

cleanup:
close(fd);

return rc || (id == 0);
}
1 change: 1 addition & 0 deletions test/regression/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ set(EXAMPLES

set(OTHERS
bash_completion
split_trace
)

if (ZLIB_FOUND)
Expand Down
2 changes: 2 additions & 0 deletions test/regression/setup.sh.in
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ GUFI_TREESUMMARY_ALL="@CMAKE_BINARY_DIR@/src/gufi_treesummary_all"
GUFI_UNROLLUP="@CMAKE_BINARY_DIR@/src/gufi_unrollup"
OLDBIGFILES="@CMAKE_BINARY_DIR@/examples/oldbigfiles"
QUERYDBS="@CMAKE_BINARY_DIR@/src/querydbs"
SPLIT_TRACE="@CMAKE_BINARY_DIR@/contrib/split_trace"
USERFILESPACEHOG="@CMAKE_BINARY_DIR@/examples/userfilespacehog"
USERFILESPACEHOGUSESUMMARY="@CMAKE_BINARY_DIR@/examples/userfilespacehogusesummary"
VERIFYTRACE="@CMAKE_BINARY_DIR@/contrib/verifytrace"
Expand Down Expand Up @@ -203,6 +204,7 @@ replace() {
s/${GUFI_UNROLLUP//\//\\/}/gufi_unrollup/g;
s/${OLDBIGFILES//\//\\/}/oldbigfiles/g;
s/${QUERYDBS//\//\\/}/querydbs/g;
s/${SPLIT_TRACE//\//\\/}/split_trace/g;
s/${USERFILESPACEHOG//\//\\/}/userfilespacehog/g;
s/${USERFILESPACEHOGUSESUMMARY//\//\\/}/userfilespacehogusesummary/g;
s/${VERIFYTRACE//\//\\/}/verifytrace/g;
Expand Down
23 changes: 23 additions & 0 deletions test/regression/split_trace.expected
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# split_trace help
$ split_trace
Syntax: split_trace src_trace delim max_split_count dst_trace_prefix [threads]

Split empty trace file
$ "split_trace" "trace.0" "|" "4" "split" || true

Expecting 0 trace files. Found 0.

$ gufi_dir2trace -d "|" -n 1 -x "prefix" "trace"
"trace.0" Already exists!
Creating GUFI Traces trace with 1 threads
Total Dirs: 5
Total Files: 14

$ split_trace "trace.0" "|" 4 "split"

$ diff "trace.0" "split"

# Split non-existant trace
$ split_trace "badtrace" "|" 4 "split" || true
Error: Could not open source trace file badtrace: No such file or directory (2)

Loading

0 comments on commit c0fd844

Please sign in to comment.