Skip to content

Commit

Permalink
prefix column with column type and length when -u is passed in
Browse files Browse the repository at this point in the history
gufi_query and gufi_sqlite3
  • Loading branch information
calccrypto committed Jan 23, 2025
1 parent 8072d3e commit 5d2f329
Show file tree
Hide file tree
Showing 21 changed files with 275 additions and 88 deletions.
18 changes: 18 additions & 0 deletions include/bf.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,24 @@ struct input {
refstr_t fin;
} sql;

/*
* if outputting to STDOUT or OUTFILE, get list of
* types of final output to prefix columns with
*
* set up by gufi_query but cleaned up by input_fini
*/
struct {
int prefix;

/* set if not aggregating */
int *tsum;
int *sum;
int *ent;

/* set if aggregating */
int *agg;
} types;

int printdir;
int printing;
int printheader;
Expand Down
2 changes: 2 additions & 0 deletions include/dbutils.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ enum CheckRollupScore {
int bottomup_collect_treesummary(sqlite3 *db, const char *dirname, sll_t *subdirs,
const enum CheckRollupScore check_rollupscore);

int *get_col_types(sqlite3 *db, const refstr_t *sql, int *cols);

#ifdef __cplusplus
}
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ OF SUCH DAMAGE.



#ifndef GUFI_QUERY_VALIDATE_INPUTS_H
#define GUFI_QUERY_VALIDATE_INPUTS_H
#ifndef GUFI_QUERY_HANDLE_SQL_H
#define GUFI_QUERY_HANDLE_SQL_H

#include "bf.h"

int validate_inputs(struct input *in);
int handle_sql(struct input *in);

#endif
2 changes: 1 addition & 1 deletion include/gufi_query/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ OF SUCH DAMAGE.

void querydb(struct work *work,
const char *dbname, const size_t dbname_len,
sqlite3 *db, const char *query,
sqlite3 *db, const char *query, int *types,
PoolArgs_t *pa, int id,
int (*callback)(void *, int, char **, char**), int *rc);

Expand Down
1 change: 1 addition & 0 deletions include/print.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ typedef struct PrintArgs {
pthread_mutex_t *mutex; /* mutex for printing to stdout */
FILE *outfile;
size_t rows; /* number of rows returned by the query */
int *types; /* if types is set, prefix each column with a 1 char type (https://www.sqlite.org/c3ref/c_blob.html) and a 4 byte human readable length */
/* size_t printed; /\* number of records printed by the callback *\/ */
} PrintArgs_t;

Expand Down
2 changes: 1 addition & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,10 @@ add_library(gufi_query_lib OBJECT
gufi_query/aggregate.c
gufi_query/external.c
gufi_query/gqw.c
gufi_query/handle_sql.c
gufi_query/process_queries.c
gufi_query/processdir.c
gufi_query/query.c
gufi_query/validate_inputs.c
)

add_dependencies(gufi_query_lib GUFI)
Expand Down
10 changes: 10 additions & 0 deletions src/bf.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ struct input *input_init(struct input *in) {

void input_fini(struct input *in) {
if (in) {
free(in->types.agg);
free(in->types.ent);
free(in->types.sum);
free(in->types.tsum);
sll_destroy(&in->external_attach, free);
trie_free(in->skip);
}
Expand Down Expand Up @@ -156,6 +160,7 @@ void print_help(const char* prog_name,
case 'd': printf(" -d <delim> delimiter (one char) [use 'x' for 0x%02X]", (uint8_t)fielddelim); break;
case 'o': printf(" -o <out_fname> output file (one-per-thread, with thread-id suffix)"); break;
case 'O': printf(" -O <out_DB> output DB"); break;
case 'u': printf(" -u prefix output with 1 byte type and 4 byte human readable length"); break; /* need to use text to avoid \x0a confusion */
case 'I': printf(" -I <SQL_init> SQL init"); break;
case 'T': printf(" -T <SQL_tsum> SQL for tree-summary table"); break;
case 'S': printf(" -S <SQL_sum> SQL for summary table"); break;
Expand Down Expand Up @@ -208,6 +213,7 @@ void show_input(struct input* in, int retval) {
printf("in.maxthreads = %zu\n", in->maxthreads);
printf("in.delim = '%c'\n", in->delim);
printf("in.andor = %d\n", (int) in->andor);
printf("in.types.prefix = %d\n", in->types.prefix);
printf("in.process_xattrs = %d\n", in->process_xattrs);
printf("in.nobody.uid = %" STAT_uid "\n", in->nobody.uid);
printf("in.nobody.gid = %" STAT_gid "\n", in->nobody.gid);
Expand Down Expand Up @@ -353,6 +359,10 @@ int parse_cmd_line(int argc,
INSTALL_STR(&in->outname, optarg);
break;

case 'u':
in->types.prefix = 1;
break;

case 'I': // SQL initializations
INSTALL_STR(&in->sql.init, optarg);
break;
Expand Down
27 changes: 27 additions & 0 deletions src/dbutils.c
Original file line number Diff line number Diff line change
Expand Up @@ -1190,3 +1190,30 @@ int bottomup_collect_treesummary(sqlite3 *db, const char *dirname, sll_t *subdir

return inserttreesumdb(dirname, db, &tsum, 0, 0, 0);
}

int *get_col_types(sqlite3 *db, const refstr_t *sql, int *cols) {
/* parse sql */
sqlite3_stmt *stmt = NULL;
const int rc = sqlite3_prepare_v2(db, sql->data, sql->len, &stmt, NULL);
if (rc != SQLITE_OK) {
fprintf(stderr, "Error: Could not prepare '%s' for getting column types: %s (%d)\n", sql->data, sqlite3_errstr(rc), rc);
return NULL;
}

/* get column count */
*cols = sqlite3_column_count(stmt);
if (*cols == 0) {
fprintf(stderr, "Error: '%s' was detected to have 0 columns\n", sql->data);
sqlite3_finalize(stmt);
return NULL;
}

/* get each column's type */
int *types = malloc(*cols * sizeof(int));
for(int i = 0; i < *cols; i++) {
types[i] = sqlite3_column_type(stmt, i);
}

sqlite3_finalize(stmt);
return types;
}
14 changes: 8 additions & 6 deletions src/gufi_query/aggregate.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,14 @@ int aggregate_process(Aggregate_t *aggregate, struct input *in) {

/* normally expect STDOUT/OUTFILE to have SQL to run, but OUTDB can have SQL to run as well */
if ((in->output != OUTDB) || in->sql.agg.len) {
PrintArgs_t pa;
pa.output_buffer = &aggregate->ob;
pa.delim = in->delim;
pa.mutex = NULL;
pa.outfile = aggregate->outfile;
pa.rows = 0;
PrintArgs_t pa = {
.output_buffer = &aggregate->ob,
.delim = in->delim,
.mutex = NULL,
.outfile = aggregate->outfile,
.rows = 0,
.types = in->types.agg,
};

char *err = NULL;
if (sqlite3_exec(aggregate->db, in->sql.agg.data, print_parallel, &pa, &err) != SQLITE_OK) {
Expand Down
44 changes: 42 additions & 2 deletions src/gufi_query/validate_inputs.c → src/gufi_query/handle_sql.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,13 @@ OF SUCH DAMAGE.


#include <stdio.h>
#include <stdlib.h>

#include "gufi_query/validate_inputs.h"
#include "dbutils.h"
#include "gufi_query/handle_sql.h"
#include "template_db.h"

int validate_inputs(struct input *in) {
int handle_sql(struct input *in) {
/*
* - Leaves are final outputs
* - OUTFILE/OUTDB + aggregation will create per thread and final aggregation files
Expand Down Expand Up @@ -140,5 +143,42 @@ int validate_inputs(struct input *in) {
}
}

/* now that the SQL has been validated, generate types if necessary */
if ((in->types.prefix == 1) && ((in->output == STDOUT) || (in->output == OUTFILE))) {
/* have to create temporary db since there is no guarantee of a db yet */
sqlite3 *db = opendb(":memory:", SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, 0, 0, create_dbdb_tables, NULL);
if (!db) {
return -1;
}

int cols = 0; /* discarded */

/* if not aggregating, get types for T, S, and E */
if (!in->sql.init_agg.len) {
if (in->sql.tsum.len) {
in->types.tsum = get_col_types(db, &in->sql.tsum, &cols);
}
if (in->sql.sum.len) {
in->types.sum = get_col_types(db, &in->sql.sum, &cols);
}
if (in->sql.ent.len) {
in->types.ent = get_col_types(db, &in->sql.ent, &cols);
}
}
/* types for G */
else {
char *err = NULL;
if (sqlite3_exec(db, in->sql.init_agg.data, NULL, NULL, &err) != SQLITE_OK) {
fprintf(stderr, "Error: Init failed while getting column types: %s\n", err);
sqlite3_free(err);
closedb(db);
return -1;
}
in->types.agg = get_col_types(db, &in->sql.agg, &cols);
}

closedb(db);
}

return 0;
}
6 changes: 3 additions & 3 deletions src/gufi_query/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ OF SUCH DAMAGE.

#include "gufi_query/aggregate.h"
#include "gufi_query/gqw.h"
#include "gufi_query/handle_sql.h"
#include "gufi_query/processdir.h"
#include "gufi_query/validate_inputs.h"

static void sub_help(void) {
printf("GUFI_index find GUFI index here\n");
Expand All @@ -93,9 +93,9 @@ int main(int argc, char *argv[])
/* Callers provide the options-string for get_opt(), which will */
/* control which options are parsed for each program. */
struct input in;
process_args_and_maybe_exit("hHvT:S:E:an:jo:d:O:I:F:y:z:J:K:G:mB:wxk:M:s:" COMPRESS_OPT "Q:", 1, "GUFI_index ...", &in);
process_args_and_maybe_exit("hHvT:S:E:an:jo:d:O:uI:F:y:z:J:K:G:mB:wxk:M:s:" COMPRESS_OPT "Q:", 1, "GUFI_index ...", &in);

if (validate_inputs(&in) != 0) {
if (handle_sql(&in) != 0) {
input_fini(&in);
return EXIT_FAILURE;
}
Expand Down
4 changes: 2 additions & 2 deletions src/gufi_query/process_queries.c
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ int process_queries(PoolArgs_t *pa,
if (in->sql.sum.len) {
recs=1; /* set this to one record - if the sql succeeds it will set to 0 or 1 */
/* put in the path relative to the user's input */
querydb(&gqw->work, dbname, dbname_len, db, in->sql.sum.data, pa, id, print_parallel, &recs);
querydb(&gqw->work, dbname, dbname_len, db, in->sql.sum.data, in->types.sum, pa, id, print_parallel, &recs);
} else {
recs = 1;
}
Expand All @@ -265,7 +265,7 @@ int process_queries(PoolArgs_t *pa,
/* if we have recs (or are running an OR) query the entries table */
if (recs > 0) {
if (in->sql.ent.len) {
querydb(&gqw->work, dbname, dbname_len, db, in->sql.ent.data, pa, id, print_parallel, &recs); /* recs is not used */
querydb(&gqw->work, dbname, dbname_len, db, in->sql.ent.data, in->types.ent, pa, id, print_parallel, &recs); /* recs is not used */
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/gufi_query/processdir.c
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,14 @@ int processdir(QPTPool_t *ctx, const size_t id, void *data, void *args) {
if (in->andor == AND) {
/* make sure the treesummary table exists */
querydb(&gqw->work, dbname, dbname_len, db, "SELECT name FROM " ATTACH_NAME ".sqlite_master "
"WHERE (type == 'table') AND (name == '" TREESUMMARY "');",
"WHERE (type == 'table') AND (name == '" TREESUMMARY "');", NULL,
pa, id, count_rows, &recs);
if (recs < 1) {
recs = -1;
}
else {
/* run in->sql.tsum */
querydb(&gqw->work, dbname, dbname_len, db, in->sql.tsum.data, pa, id, print_parallel, &recs);
querydb(&gqw->work, dbname, dbname_len, db, in->sql.tsum.data, in->types.tsum, pa, id, print_parallel, &recs);
}
}
/* this is an OR or we got a record back. go on to summary/entries */
Expand Down
16 changes: 9 additions & 7 deletions src/gufi_query/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,18 @@ OF SUCH DAMAGE.
/* wrapper wround sqlite3_exec to pass arguments and check for errors */
void querydb(struct work *work,
const char *dbname, const size_t dbname_len,
sqlite3 *db, const char *query,
sqlite3 *db, const char *query, int *types,
PoolArgs_t *pa, int id,
int (*callback)(void *, int, char **, char**), int *rc) {
ThreadArgs_t *ta = &pa->ta[id];
PrintArgs_t args;
args.output_buffer = &ta->output_buffer;
args.delim = pa->in->delim;
args.mutex = pa->stdout_mutex;
args.outfile = ta->outfile;
args.rows = 0;
PrintArgs_t args = {
.output_buffer = &ta->output_buffer,
.delim = pa->in->delim,
.mutex = pa->stdout_mutex,
.outfile = ta->outfile,
.rows = 0,
.types = types,
};

char *err = NULL;
#ifdef SQL_EXEC
Expand Down
32 changes: 30 additions & 2 deletions src/gufi_sqlite3.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ static void sub_help(void) {

int main(int argc, char *argv[]) {
struct input in;
process_args_and_maybe_exit("hvd:", 0, "[db [SQL]...]", &in);
process_args_and_maybe_exit("hvd:u", 0, "[db [SQL]...]", &in);

const int args_left = argc - idx;
const char *dbname = (args_left == 0)?SQLITE_MEMORY:argv[idx++];
Expand All @@ -95,7 +95,6 @@ int main(int argc, char *argv[]) {
}

addqueryfuncs(db);
addhistfuncs(db);

/* no buffering */
struct OutputBuffer ob;
Expand All @@ -107,6 +106,7 @@ int main(int argc, char *argv[]) {
.mutex = NULL,
.outfile = stdout,
.rows = 0,
.types = NULL,
};

char *err = NULL;
Expand All @@ -116,19 +116,47 @@ int main(int argc, char *argv[]) {
char *line = NULL;
size_t len = 0;
while (getline(&line, &len, stdin) != -1) {
if (in.types.prefix) {
refstr_t sql = {
.data = line,
.len = strlen(line),
};
int cols = 0;
pa.types = get_col_types(db, &sql, &cols);
}

if (sqlite3_exec(db, line, print_parallel, &pa, &err) != SQLITE_OK) {
sqlite_print_err_and_free(err, stderr, "Error: SQL error: %s\n", err);
free(pa.types);
pa.types = NULL;
break;
}

free(pa.types);
pa.types = NULL;
}
free(line);
}
else {
for(int i = idx; i < argc; i++) {
if (in.types.prefix) {
refstr_t sql = {
.data = argv[i],
.len = strlen(argv[i]),
};
int cols = 0;
pa.types = get_col_types(db, &sql, &cols);
}

if (sqlite3_exec(db, argv[i], print_parallel, &pa, &err) != SQLITE_OK) {
sqlite_print_err_and_free(err, stderr, "Error: SQL error: %s\n", err);
free(pa.types);
pa.types = NULL;
break;
}

free(pa.types);
pa.types = NULL;
}
}

Expand Down
Loading

0 comments on commit 5d2f329

Please sign in to comment.