Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added multicolumn aggregation to DBA and improved three essential parts which suffer from many chunks #1069

Merged
merged 4 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/dba/AbstractModelFactory.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,31 @@ public function minMaxFilter($options, $sumColumn, $op) {
return $row['column_' . strtolower($op)];
}

public function multicolAggregationFilter($options, $aggregations) {
//$options: as usual
//$columns: array of Aggregation objects

$elements = [];
foreach ($aggregations as $aggregation) {
$elements[] = $aggregation->getQueryString();
}

$query = "SELECT " . join(",", $elements);
$query = $query . " FROM " . $this->getModelTable();

$vals = array();

if (array_key_exists("filter", $options)) {
$query .= $this->applyFilters($vals, $options['filter']);
}

$dbh = self::getDB();
$stmt = $dbh->prepare($query);
$stmt->execute($vals);

return $stmt->fetch(PDO::FETCH_ASSOC);
}

public function sumFilter($options, $sumColumn) {
$query = "SELECT SUM($sumColumn) AS sum ";
$query = $query . " FROM " . $this->getModelTable();
Expand Down
37 changes: 37 additions & 0 deletions src/dba/Aggregation.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

namespace DBA;

use DBA\AbstractModelFactory;

class Aggregation {
private $column;
private $function;
/**
* @var AbstractModelFactory
*/
private $factory;

function __construct($column, $function, $factory = null) {
$this->column = $column;
$this->function = $function;
$this->factory = $factory;
}

function getName() {
return strtolower($this->function) . "_" . $this->column;
}

function getQueryString($table = "") {
if ($table != "") {
$table = $table . ".";
}
if ($this->factory != null) {
$table = $this->factory->getModelTable() . ".";
}

return $this->function . "(" . $table . $this->column . ") AS " . $this->getName();
}
}


1 change: 1 addition & 0 deletions src/dba/init.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

require_once(dirname(__FILE__) . "/AbstractModel.class.php");
require_once(dirname(__FILE__) . "/AbstractModelFactory.class.php");
require_once(dirname(__FILE__) . "/Aggregation.class.php");
require_once(dirname(__FILE__) . "/Filter.class.php");
require_once(dirname(__FILE__) . "/Order.class.php");
require_once(dirname(__FILE__) . "/Join.class.php");
Expand Down
57 changes: 33 additions & 24 deletions src/inc/Util.class.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?php

use DBA\Aggregation;
use DBA\AbstractModel;
use DBA\AccessGroup;
use DBA\AccessGroupUser;
Expand Down Expand Up @@ -377,33 +378,37 @@ public static function insertFile($path, $name, $type, $accessGroupId) {
* @return array
*/
public static function getTaskInfo($task) {
$qF = new QueryFilter(Chunk::TASK_ID, $task->getId(), "=");
$chunks = Factory::getChunkFactory()->filter([Factory::FILTER => $qF]);
$progress = 0;
$cracked = 0;
$maxTime = 0;
$totalTimeSpent = 0;
$speed = 0;
foreach ($chunks as $chunk) {
if ($chunk->getDispatchTime() > 0 && $chunk->getSolveTime() > 0) {
$totalTimeSpent += $chunk->getSolveTime() - $chunk->getDispatchTime();
}
$progress += $chunk->getCheckpoint() - $chunk->getSkip();
$cracked += $chunk->getCracked();
if ($chunk->getDispatchTime() > $maxTime) {
$maxTime = $chunk->getDispatchTime();
}
if ($chunk->getSolveTime() > $maxTime) {
$maxTime = $chunk->getSolveTime();
}
$speed += $chunk->getSpeed();
}
$qF1 = new QueryFilter(Chunk::TASK_ID, $task->getId(), "=");

$qF2 = new QueryFilter(Chunk::DISPATCH_TIME, 0, ">");
$qF3 = new QueryFilter(Chunk::SOLVE_TIME, 0, ">");
$agg1 = new Aggregation(Chunk::SOLVE_TIME, "SUM");
$agg2 = new Aggregation(Chunk::DISPATCH_TIME, "SUM");
$results = Factory::getChunkFactory()->multicolAggregationFilter([Factory::FILTER => [$qF1, $qF2, $qF3]], [$agg1, $agg2]);
jessevz marked this conversation as resolved.
Show resolved Hide resolved

$totalTimeSpent = $results[$agg1->getName()] - $results[$agg2->getName()];

$agg1 = new Aggregation(Chunk::CHECKPOINT, "SUM");
jessevz marked this conversation as resolved.
Show resolved Hide resolved
$agg2 = new Aggregation(Chunk::SKIP, "SUM");
$agg3 = new Aggregation(Chunk::CRACKED, "SUM");
$agg4 = new Aggregation(Chunk::SPEED, "SUM");
$agg5 = new Aggregation(Chunk::DISPATCH_TIME, "MAX");
$agg6 = new Aggregation(Chunk::SOLVE_TIME, "MAX");
$agg7 = new Aggregation(Chunk::CHUNK_ID, "COUNT");

$results = Factory::getChunkFactory()->multicolAggregationFilter([Factory::FILTER => $qF1], [$agg1, $agg2, $agg3, $agg4, $agg5, $agg6, $agg7]);

$progress = $results[$agg1->getName()] - $results[$agg2->getName()];
$cracked = $results[$agg3->getName()];
$speed = $results[$agg4->getName()];
$maxTime = max($results[$agg5->getName()], $results[$agg6->getName()]);
$numChunks = $results[$agg7->getName()];

$isActive = false;
if (time() - $maxTime < SConfig::getInstance()->getVal(DConfig::CHUNK_TIMEOUT) && ($progress < $task->getKeyspace() || $task->getUsePreprocessor() && $task->getKeyspace() == DPrince::PRINCE_KEYSPACE)) {
$isActive = true;
}
return array($progress, $cracked, $isActive, sizeof($chunks), ($totalTimeSpent > 0) ? round($cracked * 60 / $totalTimeSpent, 2) : 0, $speed);
return array($progress, $cracked, $isActive, $numChunks, ($totalTimeSpent > 0) ? round($cracked * 60 / $totalTimeSpent, 2) : 0, $speed);
}

/**
Expand Down Expand Up @@ -438,8 +443,12 @@ public static function getFileInfo($task, $accessGroups) {
*/
public static function getChunkInfo($task) {
$qF = new QueryFilter(Chunk::TASK_ID, $task->getId(), "=");
$cracked = Factory::getChunkFactory()->sumFilter([Factory::FILTER => $qF], Chunk::CRACKED);
$numChunks = Factory::getChunkFactory()->countFilter([Factory::FILTER => $qF]);
$agg1 = new Aggregation(Chunk::CRACKED, "SUM");
$agg2 = new Aggregation(Chunk::CHUNK_ID, "COUNT");
$results = Factory::getChunkFactory()->multicolAggregationFilter([Factory::FILTER => $qF], [$agg1, $agg2]);

$cracked = $results[$agg1->getName()];
$numChunks = $results[$agg2->getName()];

$qF = new QueryFilter(Assignment::TASK_ID, $task->getId(), "=");
$numAssignments = Factory::getAssignmentFactory()->countFilter([Factory::FILTER => $qF]);
Expand Down
18 changes: 10 additions & 8 deletions src/inc/api/APIGetChunk.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ public function execute($QUERY = array()) {
DServerLog::log(DServerLog::TRACE, "Agent is inactive!", [$this->agent]);
$this->sendErrorResponse(PActions::GET_CHUNK, "Agent is inactive!");
}

$LOCKFILE = LOCK::CHUNKING.$task->getId();

LockUtils::get(Lock::CHUNKING);
LockUtils::get($LOCKFILE);
DServerLog::log(DServerLog::TRACE, "Retrieved lock for chunking!", [$this->agent]);
$task = Factory::getTaskFactory()->get($task->getId());
Factory::getAgentFactory()->getDB()->beginTransaction();
Expand All @@ -76,7 +78,7 @@ public function execute($QUERY = array()) {
if ($task == null) { // agent needs a new task
DServerLog::log(DServerLog::DEBUG, "Task is fully dispatched", [$this->agent]);
Factory::getAgentFactory()->getDB()->commit();
LockUtils::release(Lock::CHUNKING);
LockUtils::release($LOCKFILE);
DServerLog::log(DServerLog::TRACE, "Released lock for chunking!", [$this->agent]);
$this->sendResponse(array(
PResponseGetChunk::ACTION => PActions::GET_CHUNK,
Expand All @@ -93,22 +95,22 @@ public function execute($QUERY = array()) {
// this is a special case where this task is either not allowed anymore, or it has priority 0 so it doesn't get auto assigned
if (!AccessUtils::agentCanAccessTask($this->agent, $task)) {
Factory::getAgentFactory()->getDB()->commit();
LockUtils::release(Lock::CHUNKING);
LockUtils::release($LOCKFILE);
DServerLog::log(DServerLog::INFO, "Not allowed to work on requested task", [$this->agent, $task]);
DServerLog::log(DServerLog::TRACE, "Released lock for chunking!", [$this->agent]);
$this->sendErrorResponse(PActions::GET_CHUNK, "Not allowed to work on this task!");
}
if (TaskUtils::isSaturatedByOtherAgents($task, $this->agent)) {
Factory::getAgentFactory()->getDB()->commit();
LockUtils::release(Lock::CHUNKING);
LockUtils::release($LOCKFILE);
DServerLog::log(DServerLog::TRACE, "Released lock for chunking!", [$this->agent]);
$this->sendErrorResponse(PActions::GET_CHUNK, "Task already saturated by other agents, no other task available!");
}
}

if (TaskUtils::isSaturatedByOtherAgents($task, $this->agent)) {
Factory::getAgentFactory()->getDB()->commit();
LockUtils::release(Lock::CHUNKING);
LockUtils::release($LOCKFILE);
DServerLog::log(DServerLog::TRACE, "Released lock for chunking!", [$this->agent]);
$this->sendErrorResponse(PActions::GET_CHUNK, "Task already saturated by other agents, other tasks available!");
}
Expand All @@ -119,7 +121,7 @@ public function execute($QUERY = array()) {
if ($bestTask->getId() != $task->getId()) {
Factory::getAgentFactory()->getDB()->commit();
DServerLog::log(DServerLog::INFO, "Task with higher priority available!", [$this->agent]);
LockUtils::release(Lock::CHUNKING);
LockUtils::release($LOCKFILE);
DServerLog::log(DServerLog::TRACE, "Released lock for chunking!", [$this->agent]);
$this->sendErrorResponse(PActions::GET_CHUNK, "Task with higher priority available!");
}
Expand Down Expand Up @@ -150,7 +152,7 @@ public function execute($QUERY = array()) {
if ($chunk == null) {
DServerLog::log(DServerLog::DEBUG, "Could not create a chunk, task is fully dispatched", [$this->agent, $task]);
Factory::getAgentFactory()->getDB()->commit();
LockUtils::release(Lock::CHUNKING);
LockUtils::release($LOCKFILE);
DServerLog::log(DServerLog::TRACE, "Released lock for chunking!", [$this->agent]);
$this->sendResponse(array(
PResponseGetChunk::ACTION => PActions::GET_CHUNK,
Expand All @@ -171,7 +173,7 @@ protected function sendChunk($chunk) {
return; // this can be safely done before the commit/release, because the only sendChunk which comes really at the end check for null before, so a lock which is not released cannot happen
}
Factory::getAgentFactory()->getDB()->commit();
LockUtils::release(Lock::CHUNKING);
LockUtils::release(Lock::CHUNKING.$chunk->getTaskId());
DServerLog::log(DServerLog::TRACE, "Released lock for chunking!", [$this->agent]);
$this->sendResponse(array(
PResponseGetChunk::ACTION => PActions::GET_CHUNK,
Expand Down
28 changes: 23 additions & 5 deletions src/inc/utils/HashlistUtils.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -346,12 +346,19 @@ public static function processZap($hashlistId, $separator, $source, $post, $file
$startTime = time();

//find the line separator
$buffer = fread($file, 1024);
$lineSeparators = array("\r\n", "\n", "\r");
$lineSeparator = "";
foreach ($lineSeparators as $sep) {
if (strpos($buffer, $sep) !== false) {
$lineSeparator = $sep;

// This will loop through the buffer until it finds a line separator
while (!feof($file)) {
$buffer = fread($file, 1024);
foreach ($lineSeparators as $ls) {
if (strpos($buffer, $ls) !== false) {
$lineSeparator = $ls;
break;
}
}
if (!empty($lineSeparator)) {
break;
}
}
Expand Down Expand Up @@ -387,7 +394,18 @@ public static function processZap($hashlistId, $separator, $source, $post, $file
$crackedIn[$l->getId()] = 0;
}
while (!feof($file)) {
$data = stream_get_line($file, 1024, $lineSeparator);
$data = '';
while(($line = stream_get_line($file, 1024, $lineSeparator)) !== false){
$data .= $line;
// seek back the length of lineSeparator and check if it indeed was a line separator
// If no lineSeperator was found, make sure not to check but just to keep reading
if (strlen($lineSeparator) > 0) {
fseek($file, strlen($lineSeparator) * -1, SEEK_CUR);
if (fread($file, strlen($lineSeparator)) === $lineSeparator) {
break;
}
}
}
if (strlen($data) == 0) {
continue;
}
Expand Down
20 changes: 11 additions & 9 deletions src/inc/utils/TaskUtils.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -1154,18 +1154,20 @@ public static function checkTask($task, $agent = null) {
else if ($task->getUsePreprocessor() && $task->getKeyspace() == DPrince::PRINCE_KEYSPACE) {
return $task;
}

$qF1 = new QueryFilter(Chunk::TASK_ID, $task->getId(), "=");
$qF2 = new QueryFilter(Chunk::PROGRESS, 10000, ">=");
$sum = Factory::getChunkFactory()->sumFilter([Factory::FILTER => [$qF1, $qF2]], Chunk::LENGTH);

$dispatched = $task->getSkipKeyspace() + $sum;
$completed = $task->getSkipKeyspace() + $sum;

// check chunks
$qF = new QueryFilter(Chunk::TASK_ID, $task->getId(), "=");
$chunks = Factory::getChunkFactory()->filter([Factory::FILTER => $qF]);
$dispatched = $task->getSkipKeyspace();
$completed = $task->getSkipKeyspace();
$qF1 = new QueryFilter(Chunk::TASK_ID, $task->getId(), "=");
$qF2 = new QueryFilter(Chunk::PROGRESS, 10000, "<");
$chunks = Factory::getChunkFactory()->filter([Factory::FILTER => [$qF1, $qF2]]);
foreach ($chunks as $chunk) {
if ($chunk->getProgress() >= 10000) {
$dispatched += $chunk->getLength();
$completed += $chunk->getLength();
}
else if ($chunk->getAgentId() == null) {
if ($chunk->getAgentId() == null) {
return $task; // at least one chunk is not assigned
}
else if (time() - max($chunk->getSolveTime(), $chunk->getDispatchTime()) > SConfig::getInstance()->getVal(DConfig::AGENT_TIMEOUT)) {
Expand Down
Loading