From fe16f39ee32150ca4cd02ba2d7ebcb3cab32760c Mon Sep 17 00:00:00 2001 From: Sein Coray Date: Thu, 21 Nov 2024 14:27:51 +0100 Subject: [PATCH] added multicolumn aggregation to DBA and improved three essential parts which suffer from many chunks (#1069) * Adding loops to scan through lines to support importing hashes longer then 1024 bytes * added multicolumn aggregation to DBA and improved three essential parts which suffer from many chunks * applied requested changes (constants and query merge) for pull request #1069 --------- Co-authored-by: Jesse van Zutphen --- src/dba/AbstractModelFactory.class.php | 25 ++++++++++++ src/dba/Aggregation.class.php | 42 ++++++++++++++++++++ src/dba/init.php | 1 + src/inc/Util.class.php | 53 ++++++++++++++------------ src/inc/api/APIGetChunk.class.php | 18 +++++---- src/inc/utils/TaskUtils.class.php | 20 +++++----- 6 files changed, 118 insertions(+), 41 deletions(-) create mode 100755 src/dba/Aggregation.class.php diff --git a/src/dba/AbstractModelFactory.class.php b/src/dba/AbstractModelFactory.class.php index e1326d16f..8eb63ba98 100755 --- a/src/dba/AbstractModelFactory.class.php +++ b/src/dba/AbstractModelFactory.class.php @@ -411,6 +411,31 @@ public function minMaxFilter($options, $sumColumn, $op) { return $row['column_' . strtolower($op)]; } + public function multicolAggregationFilter($options, $aggregations) { + //$options: as usual + //$columns: array of Aggregation objects + + $elements = []; + foreach ($aggregations as $aggregation) { + $elements[] = $aggregation->getQueryString(); + } + + $query = "SELECT " . join(",", $elements); + $query = $query . " FROM " . $this->getModelTable(); + + $vals = array(); + + if (array_key_exists("filter", $options)) { + $query .= $this->applyFilters($vals, $options['filter']); + } + + $dbh = self::getDB(); + $stmt = $dbh->prepare($query); + $stmt->execute($vals); + + return $stmt->fetch(PDO::FETCH_ASSOC); + } + public function sumFilter($options, $sumColumn) { $query = "SELECT SUM($sumColumn) AS sum "; $query = $query . " FROM " . $this->getModelTable(); diff --git a/src/dba/Aggregation.class.php b/src/dba/Aggregation.class.php new file mode 100755 index 000000000..be775c434 --- /dev/null +++ b/src/dba/Aggregation.class.php @@ -0,0 +1,42 @@ +column = $column; + $this->function = $function; + $this->factory = $factory; + } + + function getName() { + return strtolower($this->function) . "_" . $this->column; + } + + function getQueryString($table = "") { + if ($table != "") { + $table = $table . "."; + } + if ($this->factory != null) { + $table = $this->factory->getModelTable() . "."; + } + + return $this->function . "(" . $table . $this->column . ") AS " . $this->getName(); + } +} + + diff --git a/src/dba/init.php b/src/dba/init.php index eb8ef1a11..834f846e7 100644 --- a/src/dba/init.php +++ b/src/dba/init.php @@ -9,6 +9,7 @@ require_once(dirname(__FILE__) . "/AbstractModel.class.php"); require_once(dirname(__FILE__) . "/AbstractModelFactory.class.php"); +require_once(dirname(__FILE__) . "/Aggregation.class.php"); require_once(dirname(__FILE__) . "/Filter.class.php"); require_once(dirname(__FILE__) . "/Order.class.php"); require_once(dirname(__FILE__) . "/Join.class.php"); diff --git a/src/inc/Util.class.php b/src/inc/Util.class.php index e64073a9a..1a659eb03 100755 --- a/src/inc/Util.class.php +++ b/src/inc/Util.class.php @@ -1,5 +1,6 @@ getId(), "="); - $chunks = Factory::getChunkFactory()->filter([Factory::FILTER => $qF]); - $progress = 0; - $cracked = 0; - $maxTime = 0; - $totalTimeSpent = 0; - $speed = 0; - foreach ($chunks as $chunk) { - if ($chunk->getDispatchTime() > 0 && $chunk->getSolveTime() > 0) { - $totalTimeSpent += $chunk->getSolveTime() - $chunk->getDispatchTime(); - } - $progress += $chunk->getCheckpoint() - $chunk->getSkip(); - $cracked += $chunk->getCracked(); - if ($chunk->getDispatchTime() > $maxTime) { - $maxTime = $chunk->getDispatchTime(); - } - if ($chunk->getSolveTime() > $maxTime) { - $maxTime = $chunk->getSolveTime(); - } - $speed += $chunk->getSpeed(); - } + $qF1 = new QueryFilter(Chunk::TASK_ID, $task->getId(), "="); + + $agg1 = new Aggregation(Chunk::CHECKPOINT, Aggregation::SUM); + $agg2 = new Aggregation(Chunk::SKIP, Aggregation::SUM); + $agg3 = new Aggregation(Chunk::CRACKED, Aggregation::SUM); + $agg4 = new Aggregation(Chunk::SPEED, Aggregation::SUM); + $agg5 = new Aggregation(Chunk::DISPATCH_TIME, Aggregation::MAX); + $agg6 = new Aggregation(Chunk::SOLVE_TIME, Aggregation::MAX); + $agg7 = new Aggregation(Chunk::CHUNK_ID, Aggregation::COUNT); + $agg8 = new Aggregation(Chunk::SOLVE_TIME, Aggregation::SUM); + $agg9 = new Aggregation(Chunk::DISPATCH_TIME, Aggregation::SUM); + + $results = Factory::getChunkFactory()->multicolAggregationFilter([Factory::FILTER => $qF1], [$agg1, $agg2, $agg3, $agg4, $agg5, $agg6, $agg7, $agg8, $agg9]); + + $totalTimeSpent = $results[$agg8->getName()] - $results[$agg9->getName()]; + + $progress = $results[$agg1->getName()] - $results[$agg2->getName()]; + $cracked = $results[$agg3->getName()]; + $speed = $results[$agg4->getName()]; + $maxTime = max($results[$agg5->getName()], $results[$agg6->getName()]); + $numChunks = $results[$agg7->getName()]; $isActive = false; if (time() - $maxTime < SConfig::getInstance()->getVal(DConfig::CHUNK_TIMEOUT) && ($progress < $task->getKeyspace() || $task->getUsePreprocessor() && $task->getKeyspace() == DPrince::PRINCE_KEYSPACE)) { $isActive = true; } - return array($progress, $cracked, $isActive, sizeof($chunks), ($totalTimeSpent > 0) ? round($cracked * 60 / $totalTimeSpent, 2) : 0, $speed); + return array($progress, $cracked, $isActive, $numChunks, ($totalTimeSpent > 0) ? round($cracked * 60 / $totalTimeSpent, 2) : 0, $speed); } /** @@ -438,8 +439,12 @@ public static function getFileInfo($task, $accessGroups) { */ public static function getChunkInfo($task) { $qF = new QueryFilter(Chunk::TASK_ID, $task->getId(), "="); - $cracked = Factory::getChunkFactory()->sumFilter([Factory::FILTER => $qF], Chunk::CRACKED); - $numChunks = Factory::getChunkFactory()->countFilter([Factory::FILTER => $qF]); + $agg1 = new Aggregation(Chunk::CRACKED, "SUM"); + $agg2 = new Aggregation(Chunk::CHUNK_ID, "COUNT"); + $results = Factory::getChunkFactory()->multicolAggregationFilter([Factory::FILTER => $qF], [$agg1, $agg2]); + + $cracked = $results[$agg1->getName()]; + $numChunks = $results[$agg2->getName()]; $qF = new QueryFilter(Assignment::TASK_ID, $task->getId(), "="); $numAssignments = Factory::getAssignmentFactory()->countFilter([Factory::FILTER => $qF]); diff --git a/src/inc/api/APIGetChunk.class.php b/src/inc/api/APIGetChunk.class.php index 79af89730..5da386640 100644 --- a/src/inc/api/APIGetChunk.class.php +++ b/src/inc/api/APIGetChunk.class.php @@ -66,8 +66,10 @@ public function execute($QUERY = array()) { DServerLog::log(DServerLog::TRACE, "Agent is inactive!", [$this->agent]); $this->sendErrorResponse(PActions::GET_CHUNK, "Agent is inactive!"); } + + $LOCKFILE = LOCK::CHUNKING.$task->getId(); - LockUtils::get(Lock::CHUNKING); + LockUtils::get($LOCKFILE); DServerLog::log(DServerLog::TRACE, "Retrieved lock for chunking!", [$this->agent]); $task = Factory::getTaskFactory()->get($task->getId()); Factory::getAgentFactory()->getDB()->beginTransaction(); @@ -76,7 +78,7 @@ public function execute($QUERY = array()) { if ($task == null) { // agent needs a new task DServerLog::log(DServerLog::DEBUG, "Task is fully dispatched", [$this->agent]); Factory::getAgentFactory()->getDB()->commit(); - LockUtils::release(Lock::CHUNKING); + LockUtils::release($LOCKFILE); DServerLog::log(DServerLog::TRACE, "Released lock for chunking!", [$this->agent]); $this->sendResponse(array( PResponseGetChunk::ACTION => PActions::GET_CHUNK, @@ -93,14 +95,14 @@ public function execute($QUERY = array()) { // this is a special case where this task is either not allowed anymore, or it has priority 0 so it doesn't get auto assigned if (!AccessUtils::agentCanAccessTask($this->agent, $task)) { Factory::getAgentFactory()->getDB()->commit(); - LockUtils::release(Lock::CHUNKING); + LockUtils::release($LOCKFILE); DServerLog::log(DServerLog::INFO, "Not allowed to work on requested task", [$this->agent, $task]); DServerLog::log(DServerLog::TRACE, "Released lock for chunking!", [$this->agent]); $this->sendErrorResponse(PActions::GET_CHUNK, "Not allowed to work on this task!"); } if (TaskUtils::isSaturatedByOtherAgents($task, $this->agent)) { Factory::getAgentFactory()->getDB()->commit(); - LockUtils::release(Lock::CHUNKING); + LockUtils::release($LOCKFILE); DServerLog::log(DServerLog::TRACE, "Released lock for chunking!", [$this->agent]); $this->sendErrorResponse(PActions::GET_CHUNK, "Task already saturated by other agents, no other task available!"); } @@ -108,7 +110,7 @@ public function execute($QUERY = array()) { if (TaskUtils::isSaturatedByOtherAgents($task, $this->agent)) { Factory::getAgentFactory()->getDB()->commit(); - LockUtils::release(Lock::CHUNKING); + LockUtils::release($LOCKFILE); DServerLog::log(DServerLog::TRACE, "Released lock for chunking!", [$this->agent]); $this->sendErrorResponse(PActions::GET_CHUNK, "Task already saturated by other agents, other tasks available!"); } @@ -119,7 +121,7 @@ public function execute($QUERY = array()) { if ($bestTask->getId() != $task->getId()) { Factory::getAgentFactory()->getDB()->commit(); DServerLog::log(DServerLog::INFO, "Task with higher priority available!", [$this->agent]); - LockUtils::release(Lock::CHUNKING); + LockUtils::release($LOCKFILE); DServerLog::log(DServerLog::TRACE, "Released lock for chunking!", [$this->agent]); $this->sendErrorResponse(PActions::GET_CHUNK, "Task with higher priority available!"); } @@ -150,7 +152,7 @@ public function execute($QUERY = array()) { if ($chunk == null) { DServerLog::log(DServerLog::DEBUG, "Could not create a chunk, task is fully dispatched", [$this->agent, $task]); Factory::getAgentFactory()->getDB()->commit(); - LockUtils::release(Lock::CHUNKING); + LockUtils::release($LOCKFILE); DServerLog::log(DServerLog::TRACE, "Released lock for chunking!", [$this->agent]); $this->sendResponse(array( PResponseGetChunk::ACTION => PActions::GET_CHUNK, @@ -171,7 +173,7 @@ protected function sendChunk($chunk) { return; // this can be safely done before the commit/release, because the only sendChunk which comes really at the end check for null before, so a lock which is not released cannot happen } Factory::getAgentFactory()->getDB()->commit(); - LockUtils::release(Lock::CHUNKING); + LockUtils::release(Lock::CHUNKING.$chunk->getTaskId()); DServerLog::log(DServerLog::TRACE, "Released lock for chunking!", [$this->agent]); $this->sendResponse(array( PResponseGetChunk::ACTION => PActions::GET_CHUNK, diff --git a/src/inc/utils/TaskUtils.class.php b/src/inc/utils/TaskUtils.class.php index b8a4850ba..5e765e15d 100644 --- a/src/inc/utils/TaskUtils.class.php +++ b/src/inc/utils/TaskUtils.class.php @@ -1152,18 +1152,20 @@ public static function checkTask($task, $agent = null) { else if ($task->getUsePreprocessor() && $task->getKeyspace() == DPrince::PRINCE_KEYSPACE) { return $task; } + + $qF1 = new QueryFilter(Chunk::TASK_ID, $task->getId(), "="); + $qF2 = new QueryFilter(Chunk::PROGRESS, 10000, ">="); + $sum = Factory::getChunkFactory()->sumFilter([Factory::FILTER => [$qF1, $qF2]], Chunk::LENGTH); + + $dispatched = $task->getSkipKeyspace() + $sum; + $completed = $task->getSkipKeyspace() + $sum; // check chunks - $qF = new QueryFilter(Chunk::TASK_ID, $task->getId(), "="); - $chunks = Factory::getChunkFactory()->filter([Factory::FILTER => $qF]); - $dispatched = $task->getSkipKeyspace(); - $completed = $task->getSkipKeyspace(); + $qF1 = new QueryFilter(Chunk::TASK_ID, $task->getId(), "="); + $qF2 = new QueryFilter(Chunk::PROGRESS, 10000, "<"); + $chunks = Factory::getChunkFactory()->filter([Factory::FILTER => [$qF1, $qF2]]); foreach ($chunks as $chunk) { - if ($chunk->getProgress() >= 10000) { - $dispatched += $chunk->getLength(); - $completed += $chunk->getLength(); - } - else if ($chunk->getAgentId() == null) { + if ($chunk->getAgentId() == null) { return $task; // at least one chunk is not assigned } else if (time() - max($chunk->getSolveTime(), $chunk->getDispatchTime()) > SConfig::getInstance()->getVal(DConfig::AGENT_TIMEOUT)) {