Skip to content

Commit

Permalink
write to stdout instead of db file
Browse files Browse the repository at this point in the history
columns all have TEXT type, so sorting by numerical columns requires casting
  • Loading branch information
calccrypto committed Jan 17, 2025
1 parent 68b8b63 commit 83e84b6
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 208 deletions.
279 changes: 132 additions & 147 deletions src/gufi_vt.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ static int addvtfuncs(sqlite3 *db) {
);
}

#define delim "|" /* record separator */

typedef struct gufi_query_sql {
const char *I;
const char *T;
Expand All @@ -162,28 +164,10 @@ typedef struct gufi_query_sql {
* everything to link dynamically
*/
static int gufi_query_aggregate_db(const char *indexroot, const char *threads, const gq_sql_t *sql,
const char *prefix, char **outdb, char **errmsg) {
/* allow for aggregate databases to be placed anywhere */
const size_t outdb_len = (prefix?strlen(prefix):0) + 15;
*outdb = sqlite3_malloc(outdb_len + 1);
if (prefix) {
snprintf(*outdb, outdb_len + 1, "%s/gufi_vt.XXXXXX", prefix);
}
else {
snprintf(*outdb, outdb_len + 1, "gufi_vt.XXXXXX");
}

const int fd = mkstemp(*outdb);
if (fd < 0) {
const int err = errno;
*errmsg = sqlite3_mprintf("mkstemp('%s') failed: %s (%d)", *outdb, strerror(err), err);
goto error;
}
close(fd);

FILE **output, char **errmsg) {
const char *argv[23] = {
"gufi_query",
"-O", *outdb,
"-d", delim,
};

#define set_argv(argc, argv, flag, value) if (value) { argv[argc++] = flag; argv[argc++] = value; }
Expand All @@ -202,48 +186,30 @@ static int gufi_query_aggregate_db(const char *indexroot, const char *threads, c
argv[argc++] = indexroot;
argv[argc] = NULL;

const pid_t pid = fork();
if (pid == -1) {
const int err = errno;
*errmsg = sqlite3_mprintf("Failed to fork: %s (%d)", strerror(err), err);
goto error;
size_t len = 0;
for(int i = 0; i < argc; i++) {
len += strlen(argv[i]) + 3; /* + 2 for quotes around each argument + 1 for space between args */
}

/* child */
if (pid == 0) {
/* using execvp to allow for gufi_query executable to be selected by PATH */
const int rc = execvp("gufi_query", (char * const *) argv);
if (rc != 0) {
const int err = errno;
fprintf(stderr, "Failed to start gufi_query: %s (%d)\n", strerror(err), err);
sqlite3_free(*outdb);
*outdb = NULL;
exit(EXIT_FAILURE); /* child needs to exit here and now */
}
/* can't get here */
/* convert array of args to single string */
char *cmd = malloc(len + 1);
char *curr = cmd;
for(int i = 0; i < argc; i++) {
/* FIXME: this should use single quotes to avoid potentially processing variables, but needs to be double quotes to handle strings in SQLite properly */
curr += snprintf(curr, len + 1 - (curr - cmd), "\"%s\" ", argv[i]);
}

/* parent */
int status = EXIT_SUCCESS;
const int rc = waitpid(pid, &status, 0);
if (rc != pid) {
/* pass command to popen */
FILE *out = popen(cmd, "re");
if (!out) {
const int err = errno;
*errmsg = sqlite3_mprintf("Failed to wait for gufi_query: %s (%d)\n", strerror(err), err);
goto error;
*errmsg = sqlite3_mprintf("popen failed: %s (%d)", strerror(err), err);
return SQLITE_ERROR;
}

if (status != EXIT_SUCCESS) {
*errmsg = sqlite3_mprintf("gufi_query returned error: %d\n", status);
goto error;
}
*output = out;

return SQLITE_OK;

error:
remove(*outdb); /* not checking for error */
sqlite3_free(*outdb);
*outdb = NULL;
return SQLITE_ERROR;
}

typedef struct gufi_vtab {
Expand All @@ -258,10 +224,12 @@ typedef struct gufi_vtab {

typedef struct gufi_vtab_cursor {
sqlite3_vtab_cursor base;
sqlite3 *db; /* the aggregated results database file */
sqlite3_stmt *stmt; /* compiled SQL pulling from aggregate database */

FILE *output; /* result of popen */
char *row; /* current row */
ssize_t len; /* length of current row */

sqlite_int64 rowid; /* current row id */
int res; /* previous sqlite3_step return code */
} gufi_vtab_cursor;

/* generic connect function */
Expand Down Expand Up @@ -292,15 +260,14 @@ static int gufi_vtConnect(sqlite3 *db,
/* positional arguments to virtual table/table-valued function */
#define GUFI_VT_COLUMN_INDEXROOT 0
#define GUFI_VT_COLUMN_THREADS 1
#define GUFI_VT_COLUMN_OUTPUT_PREFIX 2
#define GUFI_VT_COLUMN_I 3
#define GUFI_VT_COLUMN_T 4
#define GUFI_VT_COLUMN_S 5
#define GUFI_VT_COLUMN_E 6
#define GUFI_VT_COLUMN_K 7
#define GUFI_VT_COLUMN_J 8
#define GUFI_VT_COLUMN_G 9
#define GUFI_VT_COLUMN_F 10
#define GUFI_VT_COLUMN_I 2
#define GUFI_VT_COLUMN_T 3
#define GUFI_VT_COLUMN_S 4
#define GUFI_VT_COLUMN_E 5
#define GUFI_VT_COLUMN_K 6
#define GUFI_VT_COLUMN_J 7
#define GUFI_VT_COLUMN_G 8
#define GUFI_VT_COLUMN_F 9

#define GUFI_VT_HIDDEN_COLUMNS ", indexroot TEXT HIDDEN, threads INT64 HIDDEN, output_prefix TEXT HIDDEN" \
", I TEXT HIDDEN" \
Expand All @@ -320,8 +287,9 @@ static int gufi_vtConnect(sqlite3 *db,
#define INTERMEDIATE "intermediate"
#define AGGREGATE "aggregate"

#define SELECT_FROM(name, extra) ("INSERT INTO " INTERMEDIATE " SELECT * FROM " name ";" extra)
#define INSERT_AGG ("INSERT INTO " AGGREGATE " SELECT * FROM " INTERMEDIATE ";")
#define SELECT_FROM(name, extra) ("INSERT INTO " INTERMEDIATE " SELECT * FROM " name ";" extra)
#define INSERT_AGG ("INSERT INTO " AGGREGATE " SELECT * FROM " INTERMEDIATE ";")
#define SELECT_AGG ("SELECT * FROM " AGGREGATE ";")

/* generate xConnect function for each virtual table */
#define gufi_vt_XConnect(name, abbrev, t, s, e) \
Expand All @@ -342,7 +310,7 @@ static int gufi_vtConnect(sqlite3 *db,
.E = e?SELECT_FROM(name, ""):NULL, \
.K = name ##_SCHEMA(AGGREGATE, ""), \
.J = INSERT_AGG, \
.G = NULL, \
.G = SELECT_AGG, \
.F = NULL, \
}; \
\
Expand Down Expand Up @@ -409,12 +377,8 @@ static int gufi_vtOpen(sqlite3_vtab *p, sqlite3_vtab_cursor **ppCursor) {

static int gufi_vtClose(sqlite3_vtab_cursor *cur) {
gufi_vtab_cursor *pCur = (gufi_vtab_cursor *) cur;
if (pCur->stmt) {
sqlite3_finalize(pCur->stmt);
}
if (pCur->db) {
sqlite3_close(pCur->db);
}
free(pCur->row);
pCur->row = NULL;
sqlite3_free(cur);
return SQLITE_OK;
}
Expand All @@ -431,7 +395,6 @@ static int gufi_vtFilter(sqlite3_vtab_cursor *cur,
/* indexroot must be present */
const char *indexroot = (const char *) sqlite3_value_text(argv[GUFI_VT_COLUMN_INDEXROOT]);
const char *threads = NULL;
const char *output_prefix = NULL;

if (argc > GUFI_VT_COLUMN_THREADS) {
/* passing NULL in the SQL will result in a NULL pointer */
Expand All @@ -453,8 +416,6 @@ static int gufi_vtFilter(sqlite3_vtab_cursor *cur,
} \
}

set_str(argc, argv, GUFI_VT_COLUMN_OUTPUT_PREFIX, output_prefix);

/* change default queries if they were provided */
set_str(argc, argv, GUFI_VT_COLUMN_I, vtab->sql.I);
set_str(argc, argv, GUFI_VT_COLUMN_T, vtab->sql.T);
Expand All @@ -465,108 +426,132 @@ static int gufi_vtFilter(sqlite3_vtab_cursor *cur,
set_str(argc, argv, GUFI_VT_COLUMN_G, vtab->sql.G);
set_str(argc, argv, GUFI_VT_COLUMN_F, vtab->sql.F);

/* run gufi_query to get aggregate results */
/* kick off gufi_query */
const int rc = gufi_query_aggregate_db(indexroot, threads, &vtab->sql,
output_prefix, &vtab->dbname, &vtab->base.zErrMsg);

if (rc != EXIT_SUCCESS) {
/* output file has already been removed */
&pCur->output, &vtab->base.zErrMsg);
if (rc != SQLITE_OK) {
return SQLITE_ERROR;
}

/* open the aggregate db file */
if (sqlite3_open_v2(vtab->dbname, &pCur->db, SQLITE_OPEN_READONLY, GUFI_SQLITE_VFS) != SQLITE_OK) {
sqlite3_free(vtab->base.zErrMsg);
vtab->base.zErrMsg = sqlite3_mprintf("Could not open aggregate db %s", vtab->dbname);
goto error;
}
pCur->rowid = 0;
pCur->row = NULL;

/* set up SQL for retreiving data from results table */
static const char SELECT_AGG[] = "SELECT * FROM " AGGREGATE ";";
if (sqlite3_prepare_v2(pCur->db, SELECT_AGG, sizeof(SELECT_AGG), &pCur->stmt, NULL) != SQLITE_OK) {
/* wait for first row */
size_t len = 0;
pCur->len = getline(&pCur->row, &len, pCur->output);

if (pCur->len < 0) { /* failed or reached EOF */
const int err = errno;
sqlite3_free(cur->pVtab->zErrMsg);
cur->pVtab->zErrMsg = sqlite3_mprintf("Could not prepare SQL for aggregate db: %s",
sqlite3_errmsg(pCur->db));
goto error;
cur->pVtab->zErrMsg = sqlite3_mprintf("Could not read first result: %s (%d)",
strerror(err), err);
return SQLITE_ERROR;
}

pCur->rowid = 0;
pCur->res = sqlite3_step(pCur->stmt); /* go to first result */
if ((pCur->res != SQLITE_ROW) && (pCur->res != SQLITE_DONE)) {
sqlite3_free(cur->pVtab->zErrMsg);
cur->pVtab->zErrMsg = sqlite3_mprintf("Could not prepare step into aggregate results: %s",
sqlite3_errmsg(pCur->db));
goto error;
if (pCur->row[pCur->len - 1] == '\n') {
pCur->row[pCur->len - 1] = '\0';
pCur->len--;
}

return SQLITE_OK;

error:
remove(vtab->dbname);
sqlite3_free(vtab->dbname);
vtab->dbname = NULL;
return SQLITE_ERROR;
}

static int gufi_vtNext(sqlite3_vtab_cursor *cur) {
gufi_vtab_cursor *pCur = (gufi_vtab_cursor *) cur;
pCur->res = sqlite3_step(pCur->stmt);
if ((pCur->res != SQLITE_ROW) && (pCur->res != SQLITE_DONE)) {
return SQLITE_ERROR;

size_t len = 0;
free(pCur->row);
pCur->row = NULL;
pCur->len = getline(&pCur->row, &len, pCur->output);

/* no more to read */
if (pCur->len == -1) {
return SQLITE_OK;
}
if (pCur->res == SQLITE_ROW) {
pCur->rowid++;

/* remove trailing newline */
if (pCur->row[pCur->len - 1] == '\n') {
pCur->len--;
}

pCur->rowid++;

return SQLITE_OK;
}

static int gufi_vtEof(sqlite3_vtab_cursor *cur) {
gufi_vtab_cursor *pCur = (gufi_vtab_cursor *) cur;
const int eof = (pCur->res != SQLITE_ROW);

const int eof = (pCur->len < 1);
if (eof) {
sqlite3_reset(pCur->stmt);
pclose(pCur->output);
pCur->output = NULL;
}

return eof;
}

static int find_col(const char *str, const size_t len, const char c, const size_t n,
const char **ptr, size_t *col_len) {
if (!str) {
*ptr = NULL;
*col_len = 0;
return 1;
}

size_t start = 0;
size_t end = 0;

/* find first column */
while ((end < len) && (str[end] != c)) {
end++;
}

if (n == 0) {
*ptr = str + start;
*col_len = end - start;
return 0;
}

/* columns > 0 */
size_t col = 1;
while ((end < len) && (col <= n)) {
start = end + 1;
end = start;

while ((end < len) && (str[end] != c)) {
end++;
}

col++;
}

if (col == (n + 1)) {
*ptr = str + start;
*col_len = end - start;
return 0;
}

*ptr = NULL;
*col_len = 0;

return 1;
}

static int gufi_vtColumn(sqlite3_vtab_cursor *cur,
sqlite3_context *ctx,
int i) {
int N) {
gufi_vtab_cursor *pCur = (gufi_vtab_cursor *) cur;

const int coltype = sqlite3_column_type(pCur->stmt, i);
switch (coltype) {
case SQLITE_INTEGER:
{
const int col = sqlite3_column_int(pCur->stmt, i);
sqlite3_result_int(ctx, col);
}
break;
case SQLITE_FLOAT:
{
const double col = sqlite3_column_int(pCur->stmt, i);
sqlite3_result_double(ctx, col);
}
break;
case SQLITE_TEXT:
{
const char *col = (char *) sqlite3_column_text(pCur->stmt, i);
const int bytes = sqlite3_column_bytes(pCur->stmt, i);
sqlite3_result_text(ctx, col, bytes, SQLITE_TRANSIENT);
}
break;
case SQLITE_BLOB:
{
const void *col = sqlite3_column_blob(pCur->stmt, i);
const int bytes = sqlite3_column_bytes(pCur->stmt, i);
sqlite3_result_blob(ctx, col, bytes, SQLITE_TRANSIENT);
}
break;
case SQLITE_NULL:
default:
sqlite3_result_null(ctx);
break;
const char *col = NULL;
size_t len = 0;
find_col(pCur->row, strlen(pCur->row), delim[0], N, &col, &len);

if (col && len) {
sqlite3_result_text(ctx, col, len, SQLITE_TRANSIENT);
}
else {
sqlite3_result_null(ctx);
}

return SQLITE_OK;
Expand Down
Loading

0 comments on commit 83e84b6

Please sign in to comment.