Skip to content

Commit

Permalink
added multicolumn aggregation to DBA and improved three essential par…
Browse files Browse the repository at this point in the history
…ts which suffer from many chunks (#1069)

* Adding loops to scan through lines to support importing hashes longer then 1024 bytes

* added multicolumn aggregation to DBA and improved three essential parts which suffer from many chunks

* applied requested changes (constants and query merge) for pull request #1069

---------

Co-authored-by: Jesse van Zutphen <[email protected]>
  • Loading branch information
s3inlc and jessevz authored Nov 21, 2024
1 parent fd5ba50 commit fe16f39
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 41 deletions.
25 changes: 25 additions & 0 deletions src/dba/AbstractModelFactory.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,31 @@ public function minMaxFilter($options, $sumColumn, $op) {
return $row['column_' . strtolower($op)];
}

public function multicolAggregationFilter($options, $aggregations) {
//$options: as usual
//$columns: array of Aggregation objects

$elements = [];
foreach ($aggregations as $aggregation) {
$elements[] = $aggregation->getQueryString();
}

$query = "SELECT " . join(",", $elements);
$query = $query . " FROM " . $this->getModelTable();

$vals = array();

if (array_key_exists("filter", $options)) {
$query .= $this->applyFilters($vals, $options['filter']);
}

$dbh = self::getDB();
$stmt = $dbh->prepare($query);
$stmt->execute($vals);

return $stmt->fetch(PDO::FETCH_ASSOC);
}

public function sumFilter($options, $sumColumn) {
$query = "SELECT SUM($sumColumn) AS sum ";
$query = $query . " FROM " . $this->getModelTable();
Expand Down
42 changes: 42 additions & 0 deletions src/dba/Aggregation.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?php

namespace DBA;

use DBA\AbstractModelFactory;

class Aggregation {
private $column;
private $function;
/**
* @var AbstractModelFactory
*/
private $factory;

const SUM = "SUM";
const MAX = "MAX";
const MIN = "MIN";
const COUNT = "COUNT";

function __construct($column, $function, $factory = null) {
$this->column = $column;
$this->function = $function;
$this->factory = $factory;
}

function getName() {
return strtolower($this->function) . "_" . $this->column;
}

function getQueryString($table = "") {
if ($table != "") {
$table = $table . ".";
}
if ($this->factory != null) {
$table = $this->factory->getModelTable() . ".";
}

return $this->function . "(" . $table . $this->column . ") AS " . $this->getName();
}
}


1 change: 1 addition & 0 deletions src/dba/init.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

require_once(dirname(__FILE__) . "/AbstractModel.class.php");
require_once(dirname(__FILE__) . "/AbstractModelFactory.class.php");
require_once(dirname(__FILE__) . "/Aggregation.class.php");
require_once(dirname(__FILE__) . "/Filter.class.php");
require_once(dirname(__FILE__) . "/Order.class.php");
require_once(dirname(__FILE__) . "/Join.class.php");
Expand Down
53 changes: 29 additions & 24 deletions src/inc/Util.class.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?php

use DBA\Aggregation;
use DBA\AbstractModel;
use DBA\AccessGroup;
use DBA\AccessGroupUser;
Expand Down Expand Up @@ -377,33 +378,33 @@ public static function insertFile($path, $name, $type, $accessGroupId) {
* @return array
*/
public static function getTaskInfo($task) {
$qF = new QueryFilter(Chunk::TASK_ID, $task->getId(), "=");
$chunks = Factory::getChunkFactory()->filter([Factory::FILTER => $qF]);
$progress = 0;
$cracked = 0;
$maxTime = 0;
$totalTimeSpent = 0;
$speed = 0;
foreach ($chunks as $chunk) {
if ($chunk->getDispatchTime() > 0 && $chunk->getSolveTime() > 0) {
$totalTimeSpent += $chunk->getSolveTime() - $chunk->getDispatchTime();
}
$progress += $chunk->getCheckpoint() - $chunk->getSkip();
$cracked += $chunk->getCracked();
if ($chunk->getDispatchTime() > $maxTime) {
$maxTime = $chunk->getDispatchTime();
}
if ($chunk->getSolveTime() > $maxTime) {
$maxTime = $chunk->getSolveTime();
}
$speed += $chunk->getSpeed();
}
$qF1 = new QueryFilter(Chunk::TASK_ID, $task->getId(), "=");

$agg1 = new Aggregation(Chunk::CHECKPOINT, Aggregation::SUM);
$agg2 = new Aggregation(Chunk::SKIP, Aggregation::SUM);
$agg3 = new Aggregation(Chunk::CRACKED, Aggregation::SUM);
$agg4 = new Aggregation(Chunk::SPEED, Aggregation::SUM);
$agg5 = new Aggregation(Chunk::DISPATCH_TIME, Aggregation::MAX);
$agg6 = new Aggregation(Chunk::SOLVE_TIME, Aggregation::MAX);
$agg7 = new Aggregation(Chunk::CHUNK_ID, Aggregation::COUNT);
$agg8 = new Aggregation(Chunk::SOLVE_TIME, Aggregation::SUM);
$agg9 = new Aggregation(Chunk::DISPATCH_TIME, Aggregation::SUM);

$results = Factory::getChunkFactory()->multicolAggregationFilter([Factory::FILTER => $qF1], [$agg1, $agg2, $agg3, $agg4, $agg5, $agg6, $agg7, $agg8, $agg9]);

$totalTimeSpent = $results[$agg8->getName()] - $results[$agg9->getName()];

$progress = $results[$agg1->getName()] - $results[$agg2->getName()];
$cracked = $results[$agg3->getName()];
$speed = $results[$agg4->getName()];
$maxTime = max($results[$agg5->getName()], $results[$agg6->getName()]);
$numChunks = $results[$agg7->getName()];

$isActive = false;
if (time() - $maxTime < SConfig::getInstance()->getVal(DConfig::CHUNK_TIMEOUT) && ($progress < $task->getKeyspace() || $task->getUsePreprocessor() && $task->getKeyspace() == DPrince::PRINCE_KEYSPACE)) {
$isActive = true;
}
return array($progress, $cracked, $isActive, sizeof($chunks), ($totalTimeSpent > 0) ? round($cracked * 60 / $totalTimeSpent, 2) : 0, $speed);
return array($progress, $cracked, $isActive, $numChunks, ($totalTimeSpent > 0) ? round($cracked * 60 / $totalTimeSpent, 2) : 0, $speed);
}

/**
Expand Down Expand Up @@ -438,8 +439,12 @@ public static function getFileInfo($task, $accessGroups) {
*/
public static function getChunkInfo($task) {
$qF = new QueryFilter(Chunk::TASK_ID, $task->getId(), "=");
$cracked = Factory::getChunkFactory()->sumFilter([Factory::FILTER => $qF], Chunk::CRACKED);
$numChunks = Factory::getChunkFactory()->countFilter([Factory::FILTER => $qF]);
$agg1 = new Aggregation(Chunk::CRACKED, "SUM");
$agg2 = new Aggregation(Chunk::CHUNK_ID, "COUNT");
$results = Factory::getChunkFactory()->multicolAggregationFilter([Factory::FILTER => $qF], [$agg1, $agg2]);

$cracked = $results[$agg1->getName()];
$numChunks = $results[$agg2->getName()];

$qF = new QueryFilter(Assignment::TASK_ID, $task->getId(), "=");
$numAssignments = Factory::getAssignmentFactory()->countFilter([Factory::FILTER => $qF]);
Expand Down
18 changes: 10 additions & 8 deletions src/inc/api/APIGetChunk.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ public function execute($QUERY = array()) {
DServerLog::log(DServerLog::TRACE, "Agent is inactive!", [$this->agent]);
$this->sendErrorResponse(PActions::GET_CHUNK, "Agent is inactive!");
}

$LOCKFILE = LOCK::CHUNKING.$task->getId();

LockUtils::get(Lock::CHUNKING);
LockUtils::get($LOCKFILE);
DServerLog::log(DServerLog::TRACE, "Retrieved lock for chunking!", [$this->agent]);
$task = Factory::getTaskFactory()->get($task->getId());
Factory::getAgentFactory()->getDB()->beginTransaction();
Expand All @@ -76,7 +78,7 @@ public function execute($QUERY = array()) {
if ($task == null) { // agent needs a new task
DServerLog::log(DServerLog::DEBUG, "Task is fully dispatched", [$this->agent]);
Factory::getAgentFactory()->getDB()->commit();
LockUtils::release(Lock::CHUNKING);
LockUtils::release($LOCKFILE);
DServerLog::log(DServerLog::TRACE, "Released lock for chunking!", [$this->agent]);
$this->sendResponse(array(
PResponseGetChunk::ACTION => PActions::GET_CHUNK,
Expand All @@ -93,22 +95,22 @@ public function execute($QUERY = array()) {
// this is a special case where this task is either not allowed anymore, or it has priority 0 so it doesn't get auto assigned
if (!AccessUtils::agentCanAccessTask($this->agent, $task)) {
Factory::getAgentFactory()->getDB()->commit();
LockUtils::release(Lock::CHUNKING);
LockUtils::release($LOCKFILE);
DServerLog::log(DServerLog::INFO, "Not allowed to work on requested task", [$this->agent, $task]);
DServerLog::log(DServerLog::TRACE, "Released lock for chunking!", [$this->agent]);
$this->sendErrorResponse(PActions::GET_CHUNK, "Not allowed to work on this task!");
}
if (TaskUtils::isSaturatedByOtherAgents($task, $this->agent)) {
Factory::getAgentFactory()->getDB()->commit();
LockUtils::release(Lock::CHUNKING);
LockUtils::release($LOCKFILE);
DServerLog::log(DServerLog::TRACE, "Released lock for chunking!", [$this->agent]);
$this->sendErrorResponse(PActions::GET_CHUNK, "Task already saturated by other agents, no other task available!");
}
}

if (TaskUtils::isSaturatedByOtherAgents($task, $this->agent)) {
Factory::getAgentFactory()->getDB()->commit();
LockUtils::release(Lock::CHUNKING);
LockUtils::release($LOCKFILE);
DServerLog::log(DServerLog::TRACE, "Released lock for chunking!", [$this->agent]);
$this->sendErrorResponse(PActions::GET_CHUNK, "Task already saturated by other agents, other tasks available!");
}
Expand All @@ -119,7 +121,7 @@ public function execute($QUERY = array()) {
if ($bestTask->getId() != $task->getId()) {
Factory::getAgentFactory()->getDB()->commit();
DServerLog::log(DServerLog::INFO, "Task with higher priority available!", [$this->agent]);
LockUtils::release(Lock::CHUNKING);
LockUtils::release($LOCKFILE);
DServerLog::log(DServerLog::TRACE, "Released lock for chunking!", [$this->agent]);
$this->sendErrorResponse(PActions::GET_CHUNK, "Task with higher priority available!");
}
Expand Down Expand Up @@ -150,7 +152,7 @@ public function execute($QUERY = array()) {
if ($chunk == null) {
DServerLog::log(DServerLog::DEBUG, "Could not create a chunk, task is fully dispatched", [$this->agent, $task]);
Factory::getAgentFactory()->getDB()->commit();
LockUtils::release(Lock::CHUNKING);
LockUtils::release($LOCKFILE);
DServerLog::log(DServerLog::TRACE, "Released lock for chunking!", [$this->agent]);
$this->sendResponse(array(
PResponseGetChunk::ACTION => PActions::GET_CHUNK,
Expand All @@ -171,7 +173,7 @@ protected function sendChunk($chunk) {
return; // this can be safely done before the commit/release, because the only sendChunk which comes really at the end check for null before, so a lock which is not released cannot happen
}
Factory::getAgentFactory()->getDB()->commit();
LockUtils::release(Lock::CHUNKING);
LockUtils::release(Lock::CHUNKING.$chunk->getTaskId());
DServerLog::log(DServerLog::TRACE, "Released lock for chunking!", [$this->agent]);
$this->sendResponse(array(
PResponseGetChunk::ACTION => PActions::GET_CHUNK,
Expand Down
20 changes: 11 additions & 9 deletions src/inc/utils/TaskUtils.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -1152,18 +1152,20 @@ public static function checkTask($task, $agent = null) {
else if ($task->getUsePreprocessor() && $task->getKeyspace() == DPrince::PRINCE_KEYSPACE) {
return $task;
}

$qF1 = new QueryFilter(Chunk::TASK_ID, $task->getId(), "=");
$qF2 = new QueryFilter(Chunk::PROGRESS, 10000, ">=");
$sum = Factory::getChunkFactory()->sumFilter([Factory::FILTER => [$qF1, $qF2]], Chunk::LENGTH);

$dispatched = $task->getSkipKeyspace() + $sum;
$completed = $task->getSkipKeyspace() + $sum;

// check chunks
$qF = new QueryFilter(Chunk::TASK_ID, $task->getId(), "=");
$chunks = Factory::getChunkFactory()->filter([Factory::FILTER => $qF]);
$dispatched = $task->getSkipKeyspace();
$completed = $task->getSkipKeyspace();
$qF1 = new QueryFilter(Chunk::TASK_ID, $task->getId(), "=");
$qF2 = new QueryFilter(Chunk::PROGRESS, 10000, "<");
$chunks = Factory::getChunkFactory()->filter([Factory::FILTER => [$qF1, $qF2]]);
foreach ($chunks as $chunk) {
if ($chunk->getProgress() >= 10000) {
$dispatched += $chunk->getLength();
$completed += $chunk->getLength();
}
else if ($chunk->getAgentId() == null) {
if ($chunk->getAgentId() == null) {
return $task; // at least one chunk is not assigned
}
else if (time() - max($chunk->getSolveTime(), $chunk->getDispatchTime()) > SConfig::getInstance()->getVal(DConfig::AGENT_TIMEOUT)) {
Expand Down

0 comments on commit fe16f39

Please sign in to comment.