.gitignore000064400000000036144761607050006545 0ustar00/vendor/ /tests/ composer.lockcomposer.json000064400000001160144761607050007276 0ustar00{ "name": "boru/dhprocess", "type": "library", "autoload": { "psr-4": { "boru\\dhprocess\\": "src/" } }, "authors": [ { "name": "Daniel Hayes", "email": "dhayes@boruapps.com" } ], "require": { "boru/dhutils": "*", "guzzlehttp/guzzle": "^6.5", "clue/mq-react": "^1.5", "react/http": "^1.8", "react/async": "^2.0", "react/child-process": "^0.6.5" }, "repositories": [ { "type": "composer", "url": "https://satis.boruapps.com" } ] } demo/HelperClass.php000064400000002067144761607050010425 0ustar00"setupNoArgs"]); } return json_encode($args); } public static function getRand($minSeconds=0,$maxSeconds=2) { $minMicro = $minSeconds*1000000; $maxMicro = $maxSeconds*1000000; $rand = rand($minMicro,$maxMicro); $seconds = round($rand / 1000000,2); echo "hi\n"; return ["value"=>$rand,"seconds"=>$seconds]; } public static function run($id=0,$minRand=0.5,$maxRand=3,$error=false) { $rand = static::getRand($minRand,$maxRand); usleep($rand["value"]); $seconds = $rand["seconds"]; $result = []; $result["id"] = $id; $result["sleep"] = $seconds; if($error) { //will throw an error (that is caught..) used to test the onError callbacks $dt = new DateTime("01/2022/41"); } return $result; } }demo/demo.php000064400000002136144761607050007141 0ustar00timeout(60); TaskQueue::queueQuickConfigVisual(1) #number of threads ->maxQueued(100) #max queued at a time -- process will queue up to this amount before starting to process (it will still allow more to be queued) ->threadStatusInterval(60); #mostly visualization showing the status.. this is the interval in seconds //register requires a name and a callable Task::register("run",["HelperClass","run"]); //simple loop for demo purposes for($i=0;$i<1000;$i++) { //task to run.. first parameter is the name used in the 'register' above, and second parameter is an array of parameters that the Callable requires. //showStart, showDone, showDetailed, and 'short' are all just for display purposes and completely optional.. the process will run without them. TaskQueue::task("run",[$i,1,3]) ->showStart(false)->showDone(true)->showDetailed(false) ->short("testRun".$i); } TaskQueue::wait();instructions-composer.txt000064400000000311144761607050011703 0ustar00{ "require": { "boru/dhout": "dev-master" }, "repositories": [ { "type": "composer", "url": "https://satis.boruapps.com" } ] }src/Task.php000064400000007462144761607050006771 0ustar00work = Work::fromCallable(static::$classMap[$name],...$args); $this->work->then(function($result) { $this->callbackOnDone($result); },function($result) { $this->callbackOnError($result); }); $this->work->onStart(function() { $this->callbackOnStart(); }); $this->work->onQueued(function() { $this->callbackOnQueue(); }); $this->showStart = TaskQueue::get("start",false); $this->showQueued = TaskQueue::get("queue",false); $this->showDone = TaskQueue::get("done",false); $this->showDetailed = TaskQueue::get("detail",false); $this->name($name); $this->taskName($name); if(isset(static::$timeoutMap[$name]) && !is_null(static::$timeoutMap[$name])) { $this->timeout(static::$timeoutMap[$name]); } } public function id() { return $this->work->getId(); } public function Work() { return $this->work; } public function start() { $this->work->start(); } public function stop() { $this->work->terminate(); } public function name($name=null) { if(is_null($name)) return $this->name; $this->name = $name; if(is_null($this->description)) { $this->work->setMetaData("description",$name); } else { $this->work->setMetaData("description",$name." : ".$this->description()); } return $this; } public function timeout($timeout=null) { if(is_null($timeout)) return $this->work->getMetaData("timeout",null); $this->work->setMetaData("timeout",$timeout); return $this; } public function taskName($taskName=null) { if(is_null($taskName)) return $this->work->getMetaData("name",null); $this->work->setMetaData("name",$taskName); return $this; } public function short($name=null) { return $this->taskName($name); } public function displayName() { if(is_null($this->description)) { return $this->name(); } return $this->name()." : ".$this->description(); } public function description($description=null) { if(is_null($description)) return $this->description; $this->description = $description; $this->work->setMetaData("description",$this->name()." : ".$description); return $this; } public function setDisplay($description=null) { return $this->description($description); } }src/TaskGroup.php000064400000006372144761607050010005 0ustar00id(uniqid()); $this->name(is_null($name) ? $this->id : $name); $this->collectResults($collectResults); } public function id($id=null) { if(is_null($id)) return $this->id; $this->id = $id; return $this; } public function name($name=null) { if(is_null($name)) return $this->name; $this->name = $name; return $this; } public function displayName() { if(is_null($this->description)) { return $this->name(); } return $this->name()." : ".$this->description(); } public function description($description=null) { if(is_null($description)) return $this->description; $this->description = $description; return $this; } public function collect() { return $this->results; } public function collectResults($collectResults=null) { if(is_null($collectResults)) return $this->collectResults; $this->collectResults = $collectResults; return $this; } public function Task($name,$args=[]) { $task = new Task($name,$args); $this->pendingTasks[$task->id()] = null; $this->done=false; $task->onDone(function($result) use ($task) { if($this->collectResults) { $this->results[$task->id()] = $result; } }); $task->lastly(function($result) use ($task) { $this->checkDone($task->id()); }); if(!$this->started) { $task->onStart(function() { $this->checkStarted(); }); } $task->setVerbosity($this->showStart,$this->showQueued,$this->showDone,$this->showDetailed); return TaskQueue::queue($task,true); } private function checkDone($taskId=null) { if(!is_null($taskId)) { unset($this->pendingTasks[$taskId]); } if(empty($this->pendingTasks)) { $this->callbackOnDone($this->results); } } private function checkStarted() { if($this->started) { return; } $this->started=true; $this->callbackOnStart(); } }src/TaskQueue.php000064400000023324144761607050007771 0ustar00false, "queue"=>false, "done"=>false, "detail"=>false, ]; /** @var Config */ private static $config; private static $expected; /** @var Queue */ private static $queue; private static $collectionMap = []; /** * @param int|null $expected * @param Queue $queue (null) */ public static function expected($expected=null) { if(is_null($expected)) return static::$expected; static::$expected = $expected; if(!is_null(static::$queue)) { static::$queue->setExpected($expected); } } /** * * @param Task $task * @param mixed $taskGroup * @param mixed $queue * @return Task */ public static function queue(Task $task,$skipCollect=false) { if(is_null(static::$queue)) { static::$queue = static::init(); } if(!$skipCollect) { $task->lastly(function($result) use ($task) { static::$collectionMap[$task->id()] = $result; $task->Work()->destroy(); }); } static::$queue->queue($task->Work()); return $task; } /** * * @param Task $task * @param mixed $taskGroup * @param mixed $queue * @return Task */ public static function add(Task $task) { return static::queue($task); } /** * * @param null|Queue $queue * @return true|null */ public static function wait() { if(!is_null(static::$queue)) { static::$queue->wait(); return true; } return null; } /** * Collects data from the Queue. Data is only collectable if TaskQueue::discardWork is set intentionally to false, work results are otherwise discarded. * * You can collect results either by using an onDone handler and collecting it that way, or using the &$collectableArray applied to ::init() and/or ::create() * @param null|Queue $queue * @return null|mixed */ public static function collect() { if(!is_null(static::$queue)) { return static::$queue->collect(); } return null; } /** * Init the task queue. * @param array $configOptions See \boru\dhprocess\config\Config::$data for available options * @param mixed $collectionArray If you want to collect results from the queue, you can pass an array by reference here. The array will be populated with the results of the work. * @return Queue */ public static function init($configOptions=[],&$collectionArray=null) { static::initConfig($configOptions,false); static::$config->applyOptions($configOptions); static::$queue = static::create(static::$config,$collectionArray); return static::$queue; } public static function create($config=null,&$collectionArray=null) { if(is_array($config)) { $config = new Config($config); } if($config instanceof Config) { } else { $config = new Config(); } $queue = new Queue($config); if(!is_null(static::$expected)) { static::expected(static::$expected,$queue); } if(!is_null($collectionArray)) { static::$collectionMap = &$collectionArray; $queue->set("discardOnDone",true); } //$queue->setWorkerLogDir("_logs/"); //$queue->workerManager()->setLog("test.log"); return $queue; } public static function getVerbosity() { return static::config()->verbosity(); } public static function setVerbosity($start=false,$queue=false,$done=false,$detail=false) { static::config()->set("start",$start); static::config()->set("queue",$queue); static::config()->set("done",$done); static::config()->set("detail",$detail); } /** * * @param mixed $name * @param array $args * @param mixed $taskGroup * @param mixed $queue * @return Task */ public static function Task($name,$args=[]) { $task = new Task($name,$args); $verbosity = static::getVerbosity(); $task->setVerbosity($verbosity["start"],$verbosity["queue"],$verbosity["done"],$verbosity["detail"]); return static::queue($task,true); } public static function Group($groupName,$collectResults=false) { return new TaskGroup($groupName,$collectResults); } //Config stuff, including backward compatibility public static function config($options=null) { static::initConfig([],false); if(is_null($options)) return static::$config; static::$config->applyOptions($options); } public static function get($key=null,$value=null) { static::initConfig([],false); return static::$config->get($key,$value); } public static function set($key,$value=null) { static::initConfig([],false); return static::$config->set($key,$value); } public static function initConfig($configOptions=null,$force=true) { if(is_null(static::$config) || $force) { static::$config = new Config(); } if(is_array($configOptions)) { static::$config->applyOptions($configOptions); } } /** * @param null|array $queueConfig * @return QueueConfig */ public static function queueConfig($queueConfig=null) { if(is_null($queueConfig)) { if(!is_null(static::$queue)) { return static::$queue->config(); } else { return static::config(); } } else { static::config()->applyOptions($queueConfig); if(!is_null(static::$queue)) { static::$queue->config()->applyOptions($queueConfig); } return static::config(); } } /** * Quick configuration that enables visualization * @param mixed $numWorkers * @param mixed $maxQueued * @return Config */ public static function queueQuickConfigVisual($numWorkers=null,$maxQueued=null) { static::config(); if(is_null($numWorkers)) { $numWorkers = intval(exec("nproc")); } if($numWorkers<=0) { $numWorkers=3; } static::config()->set("numWorkers",$numWorkers); static::config()->set("visualize",true); static::config()->set("extendedBar",true); static::config()->set("maxQueued",is_null($maxQueued) ? ($numWorkers+5) : $maxQueued); return static::config(); } /** * Quick configuration that disables visualization * @param mixed $numWorkers * @param mixed $maxQueued * @return QueueConfig */ public static function queueQuickConfigNoVisual($numWorkers=null,$maxQueued=null) { if(is_null($numWorkers)) { $numWorkers = intval(exec("nproc")); } if($numWorkers<=0) { $numWorkers=3; } static::config(); static::config()->set("numWorkers",$numWorkers); static::config()->set("visualize",false); static::config()->set("extendedBar",false); static::config()->set("maxQueued",is_null($maxQueued) ? ($numWorkers+5) : $maxQueued); return static::config(); } /** * @param null|array $workerConfig * @return WorkerConfig */ public static function workerConfig($workerConfig=null) { if(is_null($workerConfig)) { if(!is_null(static::$queue)) { return static::$queue->config(); } else { return static::$config; } } else { static::$config->applyOptions($workerConfig); if(!is_null(static::$queue)) { static::$queue->config()->applyOptions($workerConfig); } return static::$config; } } /** * Quick Worker config * @param mixed $bootstrapFile * @param mixed $logDir * @param mixed $logPerWorker * @return WorkerConfig */ public static function workerQuickConfig($bootstrapFile=null,$logDir="_logs",$logPerWorker=null) { static::config(); if(!is_null($bootstrapFile)) { static::config()->set("bootstrapFile",$bootstrapFile); } if(!is_null($logDir)) { static::config()->set("logDir",$logDir); } if(!is_null($logPerWorker)) { static::config()->set("logPerWorker",$logPerWorker); } return static::config(); } /** * @param null|array $workerConfig * @return WorkConfig */ public static function workConfig($workConfig=null) { if(is_null($workConfig)) { if(!is_null(static::$queue)) { return static::$queue->config(); } else { return static::$config; } } else { static::$config->applyOptions($workConfig); if(!is_null(static::$queue)) { static::$queue->config()->applyOptions($workConfig); } return static::$config; } } }src/config/BaseConfig.php000064400000002600144761607050011321 0ustar00config($options); } /** * Configure the object with an array of $options * @param array $options * @return $this */ public function config($options=[]) { foreach($options as $k=>$v) { $this->set($k,$v); } return $this; } /** * Get the value for $key. if $key is null, the whole array is returned. * @param null|mixed $key if null, returns the entire array. Otherwise, a dot-separated key to find in the configuration array * @param mixed $default default return value if $key isn't found/set * @return mixed */ public function get($key=null,$default=null) { if(is_null($key)) { return $this->data; } return dhGlobal::getVal($this->data,$key,$default); } /** * Set the value of $key * @param mixed $key * @param mixed $value * @return $this */ public function set($key,$value=null) { dhGlobal::dotAssign($this->data,$key,$value); return $this; } }src/config/Config.php000064400000032343144761607050010535 0ustar00 false, "queue" => false, "done" => false, "detail" => false, //queue config "numWorkers" => 3, "maxQueued" => 0, "visualize" => true, "log" => false, "extendedBar" => false, "threadStatusInterval" => 60, "maxIdleTime" => 300, "idleUpdateInterval" => 10, "threadTemplate" => "[{lastTime|since|timeFormat} {workDone} {status|max:10|pad:10}]", "summaryTemplate" => "{id} | {lastTime|since|timeFormat|pad:8} | Mem:{memUsed|formatBytes} / done:{workDone|pad:3} | {description|max:100}", "idleUpdateTemplate" => "Queued: {queued}, Active: {active} | Workers: {workers} total, {activeWorkers} active", //worker config "bootstrapFile" => null, "bootstrapCallable" => null, "timeout" => 0, "killOnError" => false, "maxWork" => 0, "onLog" => null, //work config "retriesOnError" => 0, "discardOnDone" => true, //loging config "logDir" => null, "logPrint" => false, "logLevels" => [ "comms" => true, "error" => true, "warning" => true, "notice" => true, "info" => true, "debug" => false, ], "errorExceptions" => false, ]; private static $logLevelCompressMap = [ "comms" => "c", "error" => "e", "warning" => "w", "notice" => "n", "info" => "i", "debug" => "d", ]; private static $logLevelCompressReverseMap = [ "c" => "comms", "e" => "error", "w" => "warning", "n" => "notice", "i" => "info", "d" => "debug", ]; public function __construct($options=[]) { $this->set("onLog",function($proc,$message) { $this->onLogDefault($proc,$message); }); $this->applyOptions($options); } public function applyOptions($options=[]) { if(is_object($options)) { $this->applyOptions($options->get()); } elseif(is_array($options)) { foreach($options as $k=>$v) { $this->set($k,$v); } } } public function verbosity($verbosity=null) { if(is_null($verbosity)) return [ "start"=>$this->get("start"), "queue"=>$this->get("queue"), "done"=>$this->get("done"), "detail"=>$this->get("detail"), ]; foreach($verbosity as $k=>$v) { $this->set($k,$v); } return $this; } public function get($key=null,$default=null) { if(is_null($key)) return $this->data; return dhGlobal::dotGet($this->data,$key,$default); } public function set($key,$value=null) { if(is_array($key)) { foreach($key as $k=>$v) { $this->set($k,$v); } return $this; } if(strpos($key,".")!==false) { dhGlobal::dotAssign($this->data,$key,$value); return $this; } $this->data[$key] = $value; //echo "Set $key to $value\n"; return $this; } public function getLogLevels() { return $this->get("logLevels"); } public function logLevelsToTransport() { $logLevels = $this->get("logLevels"); $string = ""; foreach($logLevels as $k=>$v) { if($v && isset(static::$logLevelCompressMap[$k])) { $string.= static::$logLevelCompressMap[$k]; } } return $string; } public function logLevelsFromTransport($string) { $logLevels = $this->getLogLevels(); foreach($logLevels as $k=>$v) { $logLevels[$k] = false; } for($i=0;$iset("logLevels",$logLevels); return $this; } /** * Set or Get the value of numWorkers * @param mixed $numWorkers * @param mixed $default * @return mixed */ public function numWorkers($numWorkers=null,$default=null) { if(is_null($numWorkers)) return $this->get("numWorkers",$default); $this->set("numWorkers",$numWorkers); return $this; } /** * Set or Get the value of maxQueued * @param mixed $maxQueued * @param mixed $default * @return mixed */ public function maxQueued($maxQueued=null,$default=null) { if(is_null($maxQueued)) return $this->get("maxQueued",$default); $this->set("maxQueued",$maxQueued); return $this; } /** * Set or Get the value of visualize * @param mixed $visualize * @param mixed $default * @return mixed */ public function visualize($visualize=null,$default=null) { if(is_null($visualize)) return $this->get("visualize",$default); $this->set("visualize",$visualize); return $this; } /** * Set or Get the value of extendedBar * @param mixed $extendedBar * @param mixed $default * @return mixed */ public function extendedBar($extendedBar=null,$default=null) { if(is_null($extendedBar)) return $this->get("extendedBar",$default); $this->set("extendedBar",$extendedBar); return $this; } /** * Set or Get the value of log * @param mixed $log * @param mixed $default * @return self|mixed */ public function log($log=null,$default=null) { if(is_null($log)) return $this->get("log",$default); $this->set("log",$log); return $this; } /** * Set or Get the value of threadStatusInterval * @param mixed $threadStatusInterval * @param mixed $default * @return self|int|false */ public function threadStatusInterval($threadStatusInterval=null,$default=null) { if(is_null($threadStatusInterval)) return $this->get("threadStatusInterval",$default); $this->set("threadStatusInterval",$threadStatusInterval); return $this; } /** * Set or Get the value of maxIdleTime * @param mixed $maxIdleTime * @param mixed $default * @return self|int|false */ public function maxIdleTime($maxIdleTime=null,$default=null) { if(is_null($maxIdleTime)) return $this->get("maxIdleTime",$default); $this->set("maxIdleTime",$maxIdleTime); return $this; } /** * Set or Get the value of idleUpdateInterval * @param mixed $idleUpdateInterval * @param mixed $default * @return self|int|false */ public function idleUpdateInterval($idleUpdateInterval=null,$default=null) { if(is_null($idleUpdateInterval)) return $this->get("idleUpdateInterval",$default); $this->set("idleUpdateInterval",$idleUpdateInterval); return $this; } /** * Set or Get the value of threadTemplate * @param mixed $threadTemplate * @param mixed $default * @return self|string|false */ public function threadTemplate($threadTemplate=null,$default=null) { if(is_null($threadTemplate)) return $this->get("threadTemplate",$default); $this->set("threadTemplate",$threadTemplate); return $this; } /** * Set or Get the value of summaryTemplate * @param mixed $summaryTemplate * @param mixed $default * @return self|string|false */ public function summaryTemplate($summaryTemplate=null,$default=null) { if(is_null($summaryTemplate)) return $this->get("summaryTemplate",$default); $this->set("summaryTemplate",$summaryTemplate); return $this; } /** * Set or Get the value of idleUpdateTemplate * @param mixed $idleUpdateTemplate * @param mixed $default * @return self|string|false */ public function idleUpdateTemplate($idleUpdateTemplate=null,$default=null) { if(is_null($idleUpdateTemplate)) return $this->get("idleUpdateTemplate",$default); $this->set("idleUpdateTemplate",$idleUpdateTemplate); return $this; } /** * Set or Get the value of retriesOnError * @param mixed $retriesOnError * @param mixed $default * @return mixed */ public function retriesOnError($retriesOnError=null,$default=null) { if(is_null($retriesOnError)) return $this->get("retriesOnError",$default); $this->set("retriesOnError",$retriesOnError); return $this; } /** * Set or Get the value of discardOnDone * @param mixed $discardOnDone * @param mixed $default * @return mixed */ public function discardOnDone($discardOnDone=null,$default=null) { if(is_null($discardOnDone)) return $this->get("discardOnDone",$default); $this->set("discardOnDone",$discardOnDone); return $this; } /** * Set or Get the value of bootstrapFile * @param mixed $bootstrapFile * @param mixed $default * @return mixed */ public function bootstrapFile($bootstrapFile=null,$default=null) { if(is_null($bootstrapFile)) return $this->get("bootstrapFile",$default); $this->set("bootstrapFile",$bootstrapFile); return $this; } /** * Set or Get the value of bootstrapCallable * @param mixed $bootstrapCallable * @param mixed $default * @return mixed */ public function bootstrapCallable($bootstrapCallable=null,$default=null) { if(is_null($bootstrapCallable)) return $this->get("bootstrapCallable",$default); $this->set("bootstrapCallable",$bootstrapCallable); return $this; } /** * Set or Get the value of timeout * @param mixed $timeout * @param mixed $default * @return mixed */ public function timeout($timeout=null,$default=null) { if(is_null($timeout)) return $this->get("timeout",$default); $this->set("timeout",$timeout); return $this; } /** * Set or Get the value of killOnError * @param mixed $killOnError * @param mixed $default * @return mixed */ public function killOnError($killOnError=null,$default=null) { if(is_null($killOnError)) return $this->get("killOnError",$default); $this->set("killOnError",$killOnError); return $this; } /** * Set or Get the value of maxWork * @param mixed $maxWork * @param mixed $default * @return mixed */ public function maxWork($maxWork=null,$default=null) { if(is_null($maxWork)) return $this->get("maxWork",$default); $this->set("maxWork",$maxWork); return $this; } /** * Set or Get the value of logDir * @param mixed $logDir * @param mixed $default * @return mixed */ public function logDir($logDir=null,$default=null) { if(is_null($logDir)) return $this->get("logDir",$default); if($logDir!==false) { if(substr($logDir,0,1) != "/") { $cwd = getcwd(); if(substr($cwd,-1) != "/") { $cwd .="/"; } $logDir = $cwd.$logDir; } } $this->set("logDir",$logDir); return $this; } /** * Set or Get the value of logPerWorker * @param null|bool $logPerWorker * @param null|bool $default * @return mixed */ public function logPerWorker($logPerWorker=null,$default=null) { if(is_null($logPerWorker)) return $this->get("logPerWorker",$default); $this->set("logPerWorker",$logPerWorker); return $this; } /** * Set or Get the value of onLog * @param null|callable $onLogCallable * @return mixed */ public function onLog($onLogCallable=null) { if(is_null($onLogCallable)) return $this->get("onLog"); $this->set("onLog",$onLogCallable); return $this; } public function onLogDefault($proc,$message) { if($this->get("logPrint",false) && $this->get("logLevels.".strtolower($message->get("level")),true)) { $level = "|".dhGlobal::padLeft(strtoupper($message->get("level")),7," ")." | "; dhGlobal::outLine($level,"WC-".$proc->getId()." | ",$message->get("message")); } } }src/config/QueueConfig.php000064400000011354144761607050011541 0ustar003, "maxQueued"=>0, "visualize"=>true, "log"=>false, "extendedBar"=>false, "threadStatusInterval"=>60, "maxIdleTime"=>300, "idleUpdateInterval"=>10, "threadTemplate"=>null, "summaryTemplate"=>null, "idleUpdateTemplate"=>null, ]; /** * Set or Get the value of numWorkers * @param mixed $numWorkers * @param mixed $default * @return mixed */ public function numWorkers($numWorkers=null,$default=null) { if(is_null($numWorkers)) return $this->get("numWorkers",$default); $this->set("numWorkers",$numWorkers); return $this; } /** * Set or Get the value of maxQueued * @param mixed $maxQueued * @param mixed $default * @return mixed */ public function maxQueued($maxQueued=null,$default=null) { if(is_null($maxQueued)) return $this->get("maxQueued",$default); $this->set("maxQueued",$maxQueued); return $this; } /** * Set or Get the value of visualize * @param mixed $visualize * @param mixed $default * @return mixed */ public function visualize($visualize=null,$default=null) { if(is_null($visualize)) return $this->get("visualize",$default); $this->set("visualize",$visualize); return $this; } /** * Set or Get the value of extendedBar * @param mixed $extendedBar * @param mixed $default * @return mixed */ public function extendedBar($extendedBar=null,$default=null) { if(is_null($extendedBar)) return $this->get("extendedBar",$default); $this->set("extendedBar",$extendedBar); return $this; } /** * Set or Get the value of log * @param mixed $log * @param mixed $default * @return self|mixed */ public function log($log=null,$default=null) { if(is_null($log)) return $this->get("log",$default); $this->set("log",$log); return $this; } /** * Set or Get the value of threadStatusInterval * @param mixed $threadStatusInterval * @param mixed $default * @return self|int|false */ public function threadStatusInterval($threadStatusInterval=null,$default=null) { if(is_null($threadStatusInterval)) return $this->get("threadStatusInterval",$default); $this->set("threadStatusInterval",$threadStatusInterval); return $this; } /** * Set or Get the value of maxIdleTime * @param mixed $maxIdleTime * @param mixed $default * @return self|int|false */ public function maxIdleTime($maxIdleTime=null,$default=null) { if(is_null($maxIdleTime)) return $this->get("maxIdleTime",$default); $this->set("maxIdleTime",$maxIdleTime); return $this; } /** * Set or Get the value of idleUpdateInterval * @param mixed $idleUpdateInterval * @param mixed $default * @return self|int|false */ public function idleUpdateInterval($idleUpdateInterval=null,$default=null) { if(is_null($idleUpdateInterval)) return $this->get("idleUpdateInterval",$default); $this->set("idleUpdateInterval",$idleUpdateInterval); return $this; } /** * Set or Get the value of threadTemplate * @param mixed $threadTemplate * @param mixed $default * @return self|string|false */ public function threadTemplate($threadTemplate=null,$default=null) { if(is_null($threadTemplate)) return $this->get("threadTemplate",$default); $this->set("threadTemplate",$threadTemplate); return $this; } /** * Set or Get the value of summaryTemplate * @param mixed $summaryTemplate * @param mixed $default * @return self|string|false */ public function summaryTemplate($summaryTemplate=null,$default=null) { if(is_null($summaryTemplate)) return $this->get("summaryTemplate",$default); $this->set("summaryTemplate",$summaryTemplate); return $this; } /** * Set or Get the value of idleUpdateTemplate * @param mixed $idleUpdateTemplate * @param mixed $default * @return self|string|false */ public function idleUpdateTemplate($idleUpdateTemplate=null,$default=null) { if(is_null($idleUpdateTemplate)) return $this->get("idleUpdateTemplate",$default); $this->set("idleUpdateTemplate",$idleUpdateTemplate); return $this; } }src/config/WorkConfig.php000064400000001753144761607050011401 0ustar000, "discardOnDone"=>false, ]; /** * Set or Get the value of retriesOnError * @param mixed $retriesOnError * @param mixed $default * @return mixed */ public function retriesOnError($retriesOnError=null,$default=null) { if(is_null($retriesOnError)) return $this->get("retriesOnError",$default); $this->set("retriesOnError",$retriesOnError); return $this; } /** * Set or Get the value of discardOnDone * @param mixed $discardOnDone * @param mixed $default * @return mixed */ public function discardOnDone($discardOnDone=null,$default=null) { if(is_null($discardOnDone)) return $this->get("discardOnDone",$default); $this->set("discardOnDone",$discardOnDone); return $this; } }src/config/WorkerConfig.php000064400000007624144761607050011733 0ustar00null, "bootstrapCallable"=>null, "timeout"=>0, "killOnError"=>false, "maxWork"=>0, "logDir"=>null, "onLog"=>null, ]; /** * Set or Get the value of bootstrapFile * @param mixed $bootstrapFile * @param mixed $default * @return mixed */ public function bootstrapFile($bootstrapFile=null,$default=null) { if(is_null($bootstrapFile)) return $this->get("bootstrapFile",$default); $this->set("bootstrapFile",$bootstrapFile); return $this; } /** * Set or Get the value of bootstrapCallable * @param mixed $bootstrapCallable * @param mixed $default * @return mixed */ public function bootstrapCallable($bootstrapCallable=null,$default=null) { if(is_null($bootstrapCallable)) return $this->get("bootstrapCallable",$default); $this->set("bootstrapCallable",$bootstrapCallable); return $this; } /** * Set or Get the value of timeout * @param mixed $timeout * @param mixed $default * @return mixed */ public function timeout($timeout=null,$default=null) { if(is_null($timeout)) return $this->get("timeout",$default); $this->set("timeout",$timeout); return $this; } /** * Set or Get the value of killOnError * @param mixed $killOnError * @param mixed $default * @return mixed */ public function killOnError($killOnError=null,$default=null) { if(is_null($killOnError)) return $this->get("killOnError",$default); $this->set("killOnError",$killOnError); return $this; } /** * Set or Get the value of maxWork * @param mixed $maxWork * @param mixed $default * @return mixed */ public function maxWork($maxWork=null,$default=null) { if(is_null($maxWork)) return $this->get("maxWork",$default); $this->set("maxWork",$maxWork); return $this; } /** * Set or Get the value of logDir * @param mixed $logDir * @param mixed $default * @return mixed */ public function logDir($logDir=null,$default=null) { if(is_null($logDir)) return $this->get("logDir",$default); if($logDir!==false) { if(substr($logDir,0,1) != "/") { $cwd = getcwd(); if(substr($cwd,-1) != "/") { $cwd .="/"; } $logDir = $cwd.$logDir; } } $this->set("logDir",$logDir); return $this; } /** * Set or Get the value of logPerWorker * @param null|bool $logPerWorker * @param null|bool $default * @return mixed */ public function logPerWorker($logPerWorker=null,$default=null) { if(is_null($logPerWorker)) return $this->get("logPerWorker",$default); $this->set("logPerWorker",$logPerWorker); return $this; } /** * Set or Get the value of onLog * @param null|callable $onLogCallable * @return mixed */ public function onLog($onLogCallable=null) { if(is_null($onLogCallable)) return $this->get("onLog"); $this->set("onLog",$onLogCallable); return $this; } public function callOnLog($process,$message) { $callable = $this->get("onLog"); if(is_callable($callable)) { $callable($process,$message); } elseif(is_null($callable)) { if($message->get("level") == "error") { dhGlobal::outLine("****",strtoupper($message->get("level")),"|","WC-".$process->getId(),"|",$message->get("message")); } } } }src/message/ExceptionMessage.php000064400000001131144761607050012741 0ustar00messageObject = $messageObject; parent::__construct($messageObject->toPacket(), $code, $previous); } public function messageObject() { return $this->messageObject; } }src/message/ExceptionResult.php000064400000001251144761607050012636 0ustar00resultObject = $resultObject; parent::__construct($resultObject->get("message"), $code, $previous); } public function resultObject() { return $this->resultObject; } public function setMessage($message) { $this->message = $message; } }src/message/Message.php000064400000016213144761607050011071 0ustar00encoding = $encoding; } public function isValid() { return !empty($this->data); } public function isError() { return !empty($this->error); } public function toDisplay($len=150) { $packet = $this->toPacket(); if(strlen($packet)>$len) { $packet = substr($packet,0,$len-4)." ..."; } return $packet; } public function toPacket() { if(empty($this->data) && empty($this->error)) { return false; } $packet = json_encode(["workId"=>$this->workId,"message"=>$this->data,"error"=>$this->error,"time"=>$this->time,"mem"=>$this->mem]); $packetData = static::encode($packet,$this->encoding); $packet = static::$startDelimiter; $packet .= $this->encoding; $packet .= static::$typeDelimiter; $packet .= $packetData; $packet .= static::$endDelimiter; return $packet; } public static function fromPacket($rawPacket) { $instance = new self(); $delimStart = static::$startDelimiter; $delimEnd = static::$endDelimiter; $delimType = static::$typeDelimiter; if(substr($rawPacket,0,strlen($delimStart)) == $delimStart && substr($rawPacket,-strlen($delimEnd)) == $delimEnd) { $rawPacket = substr($rawPacket,1); $rawPacket = substr($rawPacket,0,-1); $parts = explode($delimType,$rawPacket,2); if(count($parts) !== 2) { return false; } $instance->setEncoding($parts[0]); $instance->setRawPacket($parts[1]); $json = json_decode($parts[1],true); if(is_array($json)) { $instance->setWorkId(dhGlobal::getVal($json,"workId",null)); $instance->setData(dhGlobal::getVal($json,"message",null)); $instance->setError(dhGlobal::getVal($json,"error",null)); $instance->time(dhGlobal::getVal($json,"time",null)); $instance->mem(dhGlobal::getVal($json,"mem",null)); } return $instance; } return false; } public static function encode($packet,$type=Message::ENCODING_BASE64) { if($type == Message::ENCODING_BASE64) { return base64_encode($packet); } return $packet; } public static function decode($packet,$type=Message::ENCODING_BASE64) { if($type == Message::ENCODING_BASE64) { return base64_decode($packet); } return $packet; } public function set($key,$val="") { dhGlobal::dotAssign($this->data,$key,$val); return $this; } public function get($key=null,$default=null,$exists=false) { if(is_null($key)) { if($exists) { return !empty($this->data) ? true : false; } else { return !empty($this->data) ? $this->data : $default; } } $uniqueid = uniqid("getArray",true); if(($check = dhGlobal::getDot($this->data,$key,$uniqueid,".")) !== $uniqueid) { return $exists ? true : $check; }; return $default; } public function exists($key=null) { return $this->get($key,null,true); } public function remove($key) { unset($this->data[$key]); return $this; } /** * Get the value of encoding */ public function getEncoding() { return $this->encoding; } /** * Set the value of encoding * * @return self */ public function setEncoding($encoding) { $this->encoding = $encoding; return $this; } /** * Get the value of workId */ public function getWorkId() { return $this->workId; } /** * Set the value of workId * * @return self */ public function setWorkId($workId) { $this->workId = $workId; return $this; } /** * Get the value of rawPacket */ public function getRawPacket() { return $this->rawPacket; } /** * Set the value of rawPacket * * @return self */ public function setRawPacket($rawPacket) { $this->rawPacket = $rawPacket; return $this; } /** * Get the value of data */ public function getData() { return $this->data; } /** * Set the value of data * * @return self */ public function setData($data) { $this->data = $data; return $this; } /** * Get the value of error * * @return mixed */ public function getError() { return $this->error; } /** * Set the value of error * * @param mixed $error * @return self */ public function setError($error) { $this->error = $error; return $this; } public function time($time=null) { if(is_null($time)) return $this->time; $this->time = $time; } public function mem($mem=null) { if(is_null($mem)) return $this->mem; $this->mem = $mem; } public function data($dotKey=null,$default=null) { if(is_null($dotKey)) { return $this->getData(); } if(is_null($this->data) || empty($this->data)) { return $default; } return dhGlobal::getVal($this->data,$dotKey,$default); } public function error($dotKey=null,$default=null) { if(is_null($dotKey)) { return $this->getError(); } if(is_null($this->error) || empty($this->error)) { return $default; } return dhGlobal::getVal($this->error,$dotKey,$default); } /** * Get a Result representation of the data result * @return Result|false */ public function getDataObject() { if(!is_null($this->data)) { return new Result($this->data,$this->workId,false); } return false; } /** * Get a Result representation of the error result * @return Result|false */ public function getErrorObject() { if(!is_null($this->error)) { return new Result($this->error,$this->workId,true); } return false; } }src/message/Result.php000064400000002460144761607050010762 0ustar00workId = $workId; $this->isError = $isError; } public function isError() { return $this->isError; } public function id() { return $this->workId; } public function result($dotKey=null,$default=null) { if(is_null($dotKey)) { return $this->get("result",$default); } $newKey = "result.".$dotKey; return $this->get($newKey,$default); } public function error($dotKey=null,$default=null) { if(is_null($dotKey)) { return $this->get("error",$default); } $newKey = "error.".$dotKey; return $this->get($newKey,$default); } /** * Get the value of workId * @return mixed */ public function getWorkId() { return $this->workId; } /** * Set the value of workId * @param mixed $workId * @return self */ public function setWorkId($workId) { $this->workId = $workId; return $this; } }src/process/Process.php000064400000016616144761607050011164 0ustar00setCommand($command); $this->setArgs($args); $this->setMeta($meta); //sets all the handlers/etc $this->setPid(uniqid()); $this->buffer = new Buffer(); } /** * Starts the Child Process, returns the Promise for easy chaining * @return Promise|false * @throws UnexpectedValueException * @throws RuntimeException */ public function start() { $args = ''; if(!empty($this->args)) { $args = ProcessQueue::pack($this->args); } $this->process = new \React\ChildProcess\Process('exec '.$this->command.' '.$args); $this->process->start(); $this->setupProcessChannels(); $this->onStart(); return $this->Promise(); } public function stop() { $this->running=false; $this->process->terminate(); } public function terminate() { $this->running=false; $this->process->terminate(); if(!is_null($this->deferred)) { $this->deferred->reject("terminated"); } } public function get() { return [ "pid"=>$this->pid, "running"=>$this->running, "done"=>$this->done, "meta"=>$this->meta, "args"=>$this->args, "command"=>$this->command, "buffer"=>$this->buffer, "exitCode"=>$this->exitCode, "termSignal"=>$this->termSignal, "data"=>$this->data, ]; } public function write($data) { return $this->process->stdin->write($data); } /** * Get the buffer * @return Buffer */ public function Buffer() { return $this->buffer; } /** * Wrapper for \React\Promise\Promise::then() * @param callable|null $onFulfilled * @param callable|null $onRejected * @return PromiseInterface */ public function then(callable $onFulfilled=null,callable $onRejected=null) { return $this->Promise()->then($onFulfilled,$onRejected); } /** Handler Methods */ public function onDone($exitCode=null,$termSignal=null) { $this->done = true; $this->running = false; $this->ready = false; $this->exitCode = $exitCode; $this->termSignal = $termSignal; $this->deferred->resolve($this->buffer); if(!is_null($this->onDone)) { $handler = $this->onDone; $handler($this,$exitCode,$termSignal); } } public function onError($chunk=null) { if(!is_null($this->onError)) { $handler = $this->onError; $handler($this,$chunk); } if($this->useBuffer) { $this->buffer->err($chunk); } } public function onData($chunk=null) { if($this->isMessageFrames) { $data = !empty($this->messageBuffer) ? $this->messageBuffer : ""; $data.=$chunk; if(($message = Message::fromPacket($data)) !== false) { $chunk = $data; $this->messageBuffer = null; } else { $this->messageBuffer = $data; return; } } if(!is_null($this->onData)) { $handler = $this->onData; $handler($this,$chunk); } if($this->useBuffer) { $this->buffer->out($chunk); } } public function onStart() { $this->running = true; $this->ready = false; if(!is_null($this->onStart)) { $handler = $this->onStart; $handler($this); } } public function onStreamClose($type) { if(!is_null($this->onStreamClose)) { $handler = $this->onStreamClose; $handler($this,$type); } } public function onStreamError($type,$chunk=null) { if(!is_null($this->onStreamError)) { $handler = $this->onStreamError; $handler($this,$type,$chunk); } } /** * Set the stdout/stderr channel listening * @return void */ private function setupProcessChannels() { $this->process->stdout->on('data', function($chunk) { $this->onData(trim($chunk)); }); $this->process->stdout->on('error', function($chunk) { $this->onStreamError("stdout",$chunk); }); $this->process->stdout->on('close', function() { $this->onStreamClose("stderr"); }); $this->process->stderr->on('data', function($chunk) { $this->onError(trim($chunk)); }); $this->process->stderr->on('error', function($chunk) { $this->onStreamError("stderr",$chunk); }); $this->process->stderr->on('close', function() { $this->onStreamClose("stderr"); }); $this->process->on('exit', function($exitCode,$termSignal) { $this->onDone(); }); } /** * Set the value of meta * @param mixed $meta * @return self */ public function setMeta($meta) { $this->meta = $meta; $this->onData = dhGlobal::getVal($meta, "onData", null); $this->onError = dhGlobal::getVal($meta, "onError", null); $this->onStart = dhGlobal::getVal($meta, "onStart", null); $this->onDone = dhGlobal::getVal($meta, "onDone", null); $this->deferred = dhGlobal::getVal($meta, "deferred",null); $this->onStreamClose = dhGlobal::getVal($meta, "onStreamClose",null); $this->onStreamError = dhGlobal::getVal($meta, "onStreamError",null); $this->useBuffer = dhGlobal::getVal($meta, "useBuffer", true); $this->isMessageFrames = dhGlobal::getVal($meta, "isMessageFrames", false); if(is_null($this->deferred)) { $this->deferred = new Deferred(); } return $this; } public function jsonSerialize($array=null) { if(is_null($array)) { $array = $this->get(); } return $array; } public function __toString() { return json_encode($this); } }src/process/WorkerProcess.php000064400000027120144761607050012346 0ustar00setPid(uniqid()); $this->queueWorker = $queueWorker; $this->queue = $queue; $workerOptions = []; //$queue->get("timeout"),$queue->get("bootstrapFile"),$queue->get("logDir") $workerOptions[] = "id:".$this->getPid(); if(!is_null($queue)) { $workerOptions[] = "qid:".$queue->id(); } if(!is_null($queueWorker)) { $workerOptions[] = "wid:".$queueWorker->getId(); } if(($bootstrap = $queue->get("bootstrapFile",false)) !== false) { if(is_array($bootstrap)) { $bsString = implode(" ",$bootstrap); } else { $bsString = $bootstrap; } $workerOptions[]="inc:".$bsString; } if($queue->get("logDir")) { $workerOptions[] = "log:".QueueLogger::getLogFile("default"); } $logSettingString = $queue->config()->logLevelsToTransport(); if(!empty($logSettingString)) { $workerOptions[] = "ll:".$logSettingString; } $workerString = empty($workerOptions) ? "" : implode(" ",$workerOptions); if(($workerFile = dhGlobal::fileIfExists(__DIR__."/../worker/WorkerClient.php")) === false) { throw new \Exception("Cannot find worker script."); }; $this->setCommand("php -f ".$workerFile->path()." ".$workerString); //echo $this->getCommand()."\n"; //exit(); $this->setArgs([]); $this->setMeta($meta); //sets all the handlers/etc $this->setMetaData("lastTime",microtime(true)); $this->setMetaData("workerTimeout",$queue->get("timeout")); } /** * Get the queueId */ public function getQueueId() { return $this->queue->id(); } public function queueId() { return $this->getQueueId(); } public function getQueueConfig() { return $this->queue->getQueueConfig(); } /** * Get the queueWorkerId */ public function getQueueWorkerId() { return $this->queueWorker->getId(); } public function queueWorkerId() { return $this->getQueueWorkerId(); } /** * Starts the Child Process, returns the Promise for easy chaining * @return Promise|false * @throws UnexpectedValueException * @throws RuntimeException */ public function start() { $args = ''; if(!empty($this->args)) { $args = ProcessQueue::pack($this->args); } $this->process = new \React\ChildProcess\Process('exec '.$this->command.' '.$args); $this->process->start(); $this->setupProcessChannels(); $this->onStart(); return $this->Promise(); } public function stop() { $this->running=false; $this->process->terminate(); $this->process=null; } public function terminate() { $this->running=false; if(!is_null($this->process)) { $this->process->terminate(); } if(!is_null($this->deferred)) { $this->deferred->reject(new \Exception("terminated")); } $this->process=null; } public function get() { return [ "pid"=>$this->pid, "running"=>$this->running, "done"=>$this->done, "meta"=>$this->meta, "args"=>$this->args, "command"=>$this->command, "exitCode"=>$this->exitCode, "termSignal"=>$this->termSignal, "data"=>$this->data, ]; } public function write($data) { if(!is_null($this->process) && !is_null($this->process->stdin)) { return $this->process->stdin->write($data); } } /** * Wrapper for \React\Promise\Promise::then() * @param callable|null $onFulfilled * @param callable|null $onRejected * @return PromiseInterface */ public function then(callable $onFulfilled=null,callable $onRejected=null) { return $this->Promise()->then($onFulfilled,$onRejected); } /** Handler Methods */ public function onDone($exitCode=null,$termSignal=null) { $this->done = true; $this->running = false; $this->ready = false; $this->exitCode = $exitCode; $this->termSignal = $termSignal; $this->queueWorkerOnDone($exitCode,$termSignal); $this->deferred->resolve(true); if(!is_null($this->onDone)) { $handler = $this->onDone; $handler($this,$exitCode,$termSignal); } } public function onError($chunk=null) { $this->queueWorkerOnError($chunk); if(!is_null($this->onError)) { $handler = $this->onError; $handler($this,$chunk); } } public function onData($chunk=null) { $data = !empty($this->messageBuffer) ? $this->messageBuffer : ""; $data.=$chunk; if(($message = Message::fromPacket($data)) !== false) { $chunk = $data; $this->messageBuffer = null; } else { $this->messageBuffer = $data; return; } $this->queueWorkerOnData($chunk); if(!is_null($this->onData)) { $handler = $this->onData; $handler($this,$chunk); } } public function onStart() { $this->running = true; $this->ready = false; $this->queueWorkerOnStart(); if(!is_null($this->onStart)) { $handler = $this->onStart; $handler($this); } } public function onStreamClose($type) { $this->queueWorkerOnStreanClose($type); if(!is_null($this->onStreamClose)) { $handler = $this->onStreamClose; $handler($this,$type); } } public function onStreamError($type,$chunk=null) { $this->queueWorkerOnStreamError($type,$chunk); if(!is_null($this->onStreamError)) { $handler = $this->onStreamError; $handler($this,$type,$chunk); } } /** * Set the stdout/stderr channel listening * @return void */ private function setupProcessChannels() { $this->process->stdout->on('data', function($chunk) { $this->onData(trim($chunk)); }); $this->process->stdout->on('error', function($chunk) { $this->onStreamError("stdout",$chunk); }); $this->process->stdout->on('close', function() { $this->onStreamClose("stdout"); }); $this->process->stderr->on('data', function($chunk) { $this->onError(trim($chunk)); }); $this->process->stderr->on('error', function($chunk) { $this->onStreamError("stderr",$chunk); }); $this->process->stderr->on('close', function() { $this->onStreamClose("stderr"); }); $this->process->on('exit', function($exitCode,$termSignal) { $this->onDone(); }); } /** * Set the value of meta * @param mixed $meta * @return self */ public function setMeta($meta) { $this->meta = $meta; $this->onData = dhGlobal::getVal($meta, "onData", null); $this->onError = dhGlobal::getVal($meta, "onError", null); $this->onStart = dhGlobal::getVal($meta, "onStart", null); $this->onDone = dhGlobal::getVal($meta, "onDone", null); $this->deferred = dhGlobal::getVal($meta, "deferred",null); $this->onStreamClose = dhGlobal::getVal($meta, "onStreamClose",null); $this->onStreamError = dhGlobal::getVal($meta, "onStreamError",null); if(is_null($this->deferred)) { $this->deferred = new Deferred(); } return $this; } public function jsonSerialize($array=null) { if(is_null($array)) { $array = $this->get(); } return $array; } public function __toString() { return json_encode($this); } private function queueWorkerOnStart() { if(is_null($this->queueWorker)) return false; dhGlobal::trace("[worker]",$this->getId(),"worker process started"); $this->queueWorker->onStart(); } private function queueWorkerOnDone($exitCode=null,$termSignal=null) { if(is_null($this->queueWorker)) return false; $this->queueWorker->onDone($exitCode,$termSignal); } private function queueWorkerOnError($chunk=null) { if(is_null($this->queueWorker)) return false; $lines = explode("\n",$chunk); if(!empty($lines)) { foreach($lines as $line) { dhGlobal::trace("[worker-error]",$this->getId(),":error:",$line); $this->queueWorker->onError($line); } } else { dhGlobal::trace("[worker-error]",$this->getId(),":error:"); $this->queueWorker->onError(); } } private function queueWorkerOnData($chunk=null) { if(is_null($this->queueWorker)) return false; $lines = explode("\n",$chunk); if(!empty($lines)) { foreach($lines as $line) { dhGlobal::trace("[worker]",$this->getId(),":data:",$line); $this->queueWorker->onData($line); } } } private function queueWorkerOnStreanClose($type) { if(is_null($this->queueWorker)) return false; $this->queueWorker->disable(); } private function queueWorkerOnStreamError($type,$chunk=null) { if(is_null($this->queueWorker)) return false; $this->queueWorker->disable(); } /** * * @param QueueWorker $queueWorker * @return WorkerProcess */ public static function forQueue(QueueWorker $queueWorker,Queue $queue) { $instance = new self([],$queueWorker,$queue); return $instance; } }src/process/traits/GettersSetters.php000064400000012465144761607050014041 0ustar00getId(); } public function getId() { return $this->getPid(); } /** * Get the value of pid * @return mixed */ public function getPid() { return $this->pid; } public function isDone() { return $this->done; } public function isRunning() { return $this->running; } public function isReady() { return $this->ready; } /** * Get the value of command * @return mixed */ public function getCommand() { return $this->command; } /** * Get the value of args * @return mixed */ public function getArgs() { return $this->args; } /** * Get the value of meta * @return mixed */ public function getMeta() { return $this->meta; } /** * Return the processes's stdOut stream interface * @return \React\Stream\ReadableStreamInterface */ public function stdOut() { return $this->process->stdout; } /** * Return the processes's stdErr stream interface * @return \React\Stream\ReadableStreamInterface */ public function stdErr() { return $this->process->stderr; } /** * Return the processes's stdIn stream interface * @return \React\Stream\WritableStreamInterface */ public function stdIn() { return $this->process->stdin; } /** * Return the Deferred * @return Deferred */ public function Deferred() { return $this->deferred; } /** * Return the Promise (if exists) * @return Promise|false */ public function Promise() { return !is_null($this->deferred) ? $this->deferred->promise() : false; } public function setMetaData($key,$val="",$append=false) { if(strpos($key,".") !== false) { if($append) { $check = dhGlobal::getDot($this->metaData,$key); if(!is_null($check)) { if(is_array($check)) { $check[] = $val; $val = $check; } else { $narr = []; $narr[] = $check; $narr[] = $val; $val = $narr; } } } dhGlobal::dotAssign($this->metaData,$key,$val); } else { if(isset($this->metaData[$key]) && $append) { if(is_array($this->metaData[$key])) { $this->metaData[$key][] = $val; } else { $temp = $this->metaData[$key]; $array[$key] = []; $array[$key][] = $temp; $array[$key][] = $val; } } else { $this->metaData[$key] = $val; } } return $this; } public function getMetaData($key=null,$default=null,$exists=false) { if(is_null($key)) { if($exists) { return !empty($this->metaData) ? true : false; } else { return !empty($this->metaData) ? $this->metaData : $default; } } if(strpos($key,".") !== false) { $uniqueid = uniqid("getArray",true); if(($check = dhGlobal::getDot($this->metaData,$key,$uniqueid)) !== $uniqueid) { return $exists ? true : $check; }; } if($exists) { return isset($this->metaData[$key]); } else { return isset($this->metaData[$key]) ? $this->metaData[$key] : $default; } } public function metaDataExists($key) { return $this->getMetaData($key,null,true); } /** * Set the value of running * @param bool $running * @return self */ public function setRunning($running) { $this->running = $running; return $this; } /** * Set the value of done * @param bool $done * @return self */ public function setDone($done) { $this->done = $done; return $this; } /** * Set the Deferred that will be resolved on completion * @param Deferred $deferred * @return $this */ public function setDeferred(Deferred $deferred) { $this->deferred = $deferred; return $this; } /** * Set the value of pid * @param mixed $pid * @return self */ public function setPid($pid) { $this->pid = $pid; return $this; } /** * Set the value of command * @param mixed $command * @return self */ public function setCommand($command) { $this->command = $command; return $this; } /** * Set the value of args * @param mixed $args * @return self */ public function setArgs($args) { $this->args = $args; return $this; } }src/queue/Queue.php000064400000050650144761607050010274 0ustar00id = uniqid(); if(is_null($config)) { $config = new Config(); } $this->config = $config; $this->deferred = new Deferred(); if($this->get("visualize",false)) { $this->template = new Template(); $this->progress = new QueueStatus(" >> "); $this->promise()->then(function() { $this->updateProgress(); $this->progress->stop(); }); if($this->get("extendedBar",false)) { $this->progress->showExec(true); $this->progress->setThreads($this->get("numWorkers",1),$this->get("threadTemplate")); } StdOut::progressBar($this->progress); } } public function progress() { return $this->progress(); } public function __destruct() { if(!is_null($this->progress)) { $this->progress->stop(); } } public function updateInterval($interval=0.5) { if($this->get("visualize",false) && !is_null($this->progress)) { $this->progress->updateInterval($interval); } return $this; } /** * Add work to the Queue * @param Work $work * @return Work * @throws UnexpectedValueException * @throws RuntimeException * @throws Exception */ public function queue(Work $work) { if(is_null($this->deferred)) { $this->deferred = new Deferred(); } $this->work[$work->getId()] = $work; $this->moveToQueued($work); $maxQueued = intval($this->get("maxQueued",0)); if(empty($maxQueued)) { $maxQueued = false; } else { $queueSpace = $maxQueued - count($this->queued); } if($maxQueued === false || $queueSpace <= 1) { $this->checkWorkers(); $this->checkMaxQueued(); } if(!$this->hasStarted && $this->get("visualize",false) && count($this->queued)%250 == 0) { $this->updateProgress(true); } //$this->total++; return $work; } public function reset() { $this->deferred=null; $this->hasStarted = false; $this->queued=[]; $this->active=[]; $this->done=[]; $this->work=[]; return $this; } /** * Get the next queued item, or false if nothing is queued * @return Work|false */ public function next() { $this->hasStarted = true; if(!empty($this->queued)) { $work = array_shift($this->queued); $this->moveToActive($work); return $work; } return false; } public function promise() { if(is_null($this->deferred)) { $this->deferred = new Deferred(); $this->deferred->resolve([]); } return $this->deferred->promise(); } public function wait() { $this->log("wait called"); $this->isWaiting=true; $this->isDone=false; $this->promise()->then(function() use (&$isDone) { $this->isDone=true; }); $this->setupTimers(); Loop::addPeriodicTimer(0.1,function($timer) { if($this->isDone) { $this->stopTimers(); Loop::cancelTimer($timer); Loop::stop(); } else { $this->checkWorkers(); if($this->workerCount() <= 0) { if(is_null($this->workerCountEmptySince)) { $this->workerCountEmptySince = microtime(true); } if((microtime(true)-$this->workerCountEmptySince) > 5) { $this->isDone=true; } } else { $this->workerCountEmptySince = null; } } }); while(!$this->isDone) { Loop::run(); } //\React\Async\await($this->promise()); return $this; } public function collect() { $this->wait(); return $this->work; } private function checkMaxQueued() { $this->setupTimers(); $maxQueued = intval($this->get("maxQueued",0)); if(empty($maxQueued)) { $maxQueued = 0; } if($maxQueued>0 && count($this->queued) >= $maxQueued) { Loop::addPeriodicTimer(0.1,function($timer) use ($maxQueued) { if($maxQueued>0 && count($this->queued) >= $maxQueued) { } else { Loop::stop(); } }); Loop::run(); } } public function workerCount() { return count($this->queueWorkers); } public function workerActiveCount() { return count($this->queueWorkersActive); } public function workerEnabledCount() { return count($this->queueWorkersEnabled); } public function workerDisabledCount() { return count($this->queueWorkersDisabled); } public function checkWorkers() { $queueCount = count($this->getQueued()); $tempMax = min($this->get("numWorkers",1),$queueCount); //dhGlobal::outLine("checkWorkers: ",$queueCount,$tempMax,"w:",$this->workerCount(),"e",$this->workerEnabledCount(),"d",$this->workerDisabledCount()); foreach($this->queueWorkers as $k=>$qw) { if($qw->get("disabled",false)) { if($qw->getStoppedWork()) { $this->moveToQueued($qw->getStoppedWork()); dhGlobal::outLine("re-queing",$qw->getStoppedWork()->getId()); } dhGlobal::outLine("Removing disabled queueWorker.."); unset($this->queueWorkers[$k]); } } while($this->workerCount() < $tempMax) { $queueWorker = new QueueWorker($this); $queueWorker->enable(); $this->queueWorkers[] = $queueWorker; } while($this->workerEnabledCount() < $tempMax && $this->workerDisabledCount() > 0) { $qw = array_shift($this->queueWorkersDisabled); $qw->enable(); dhGlobal::outLine("Enabling a disabled queueWorker.."); } } public function id() { return $this->id; } /** @return bool true if queued array is not empty, false if empty */ public function hasQueued(){ return !empty($this->queued); } /** @return bool true if the active array is not empty, false if empty */ public function hasActive(){ return !empty($this->active); } /** @return bool true if the done array is not empty, false if empty */ public function hasDone(){ return !empty($this->done); } /** * Retrieve the work record by ID * @param string $workId * @return Work|false */ public function getWork($workId) { return isset($this->work[$workId]) ? $this->work[$workId] : false; } /** * Retrieve full work array * @return Work[]|false */ public function getAllWork() { return !empty($this->work) ? $this->work : false; } /** @return bool true if $work is in the active state, false otherwise */ public function workIsActive(Work $work) { return $this->getActive($work->getId()) !== false ? true : false; } /** @return bool true if $work is in the queued state, false otherwise */ public function workIsQueued(Work $work) { return $this->getActive($work->getId()) !== false ? true : false; } /** @return bool true if $work is in the done state, false otherwise */ public function workIsDone(Work $work) { return $this->getDone($work->getId()) !== false ? true : false; } /** * Get the ueue, or get a Work from the queue by ID * @param string|null $workId * @return Work[]|false|Work */ public function getQueued($workId=null) { if(!is_null($workId)) { return dhGlobal::getVal($this->queued,$workId,false); } return $this->queued; } /** * Get the active queue, or get a Work from the queue by ID * @param string|null $workId * @return Work[]|false|Work */ public function getActive($workId=null) { if(!is_null($workId)) { return dhGlobal::getVal($this->active,$workId,false); } return $this->active; } /** * Get the done queue, or get a Work from the queue by ID * @param string|null $workId * @return Work[]|false|Work */ public function getDone($workId=null) { if(!is_null($workId)) { return dhGlobal::getVal($this->done,$workId,false); } return $this->done; } public function status() { $workers = count($this->queueWorkers); $activeWorkers = 0; foreach($this->queueWorkers as $k => $queueWorker) { if($queueWorker->get("active",false)) { $activeWorkers++; } } return [ "done"=>count($this->getDone()), "expected"=>$this->expected, "active"=>count($this->getActive()), "queued"=>count($this->getQueued()), "workers"=>$workers, "activeWorkers"=>$activeWorkers, "threadData"=>$this->queueWorkers ]; } public function updateProgress($force=false) { if(!$force) { if(!$this->get("visualize",false) || $this->workerCount()<=0 || is_null($this->progress)) { return; } } $this->progress->update(count($this->done),count($this->active),count($this->queued),$this->expected); if(!$force) { $this->progress->setThreadData($this->queueWorkers); if(!is_null($this->lastActivity)) { $lastActivityDiff = microtime(true)-$this->lastActivity; $lastIdleDisplay = microtime(true)-$this->lastIdleUpdate; $maxIdleTime = $this->get("maxIdleTime",300); $idleUpdateTime = $this->get("idleUpdateInterval",60); if($maxIdleTime >0 && $lastActivityDiff >= $maxIdleTime) { $time = round(microtime(true)-$this->lastActivity); $dtF = new \DateTime('@0'); $dtT = new \DateTime("@$time"); $str = dhGlobal::dateIntervalToElapsed($dtF->diff($dtT),true,false,2,""); dhGlobal::outLine("it has been",$str,"since anything happened.. closing"); if($this->isWaiting) { $this->isDone=true; } else { exit(); } } elseif($idleUpdateTime >0 && min($lastActivityDiff,$lastIdleDisplay) >= $idleUpdateTime) { $this->lastIdleUpdate = microtime(true); $this->idleUpdate(); } } } else { $this->progress->draw(); } } public function touchLastActivityTime() { $this->lastActivity = microtime(true); } public function moveToQueued(Work $work) { $work->onWorkQueued(); return $this->moveTo($work,$this->queued); } public function moveToActive(Work $work) { $work->onWorkStart(); return $this->moveTo($work,$this->active); } public function moveToDone(Work $work) { $w = $this->moveTo($work,$this->done); if($this->get("discardOnDone",false)) { $this->done[$work->getId()] = null; unset($this->work[$work->getId()]);// = null; } return $w; } public function workOnDone(Work $work,$message=null) { $this->logWork($work,"workOnDone"); $this->moveToDone($work); if(!is_null($message)) { $work->done($message->getDataObject()); } else { $work->done(new Result([],$work->getId(),false)); } } public function workOnError(Work $work,$message=null,$workerClosed=false) { if($workerClosed) { $this->logWork($work,"workOnError","worker closed..."); if($this->get("retriesOnError",0)) { dhGlobal::debug("Work stopped due to worker close, handling with retry process"); $this->handleWorkOnError($work,$message); } else { $this->moveToQueued($work); dhGlobal::debug("Work re-queued due to worker close"); } } else { $e = new \Exception(); //dhGlobal::debug("workOnError called",$work->getId(),$e->getTraceAsString()); //$this->logWork($work,"workOnError called",$e->getTraceAsString()); $this->handleWorkOnError($work,$message); } } private function handleWorkOnError(Work $work,$message=null) { $this->logWork($work,"handleWorkOnError",!is_null($message) ? $message : ""); $retryCount = isset($this->retryCounts[$work->getId()]) ? $this->retryCounts[$work->getId()] : 0; $retryCount++; $retriesOnError = $this->get("retriesOnError",0); if($retriesOnError>0 && $retryCount<=$retriesOnError) { dhGlobal::trace("[retry]","retry count for",$work->getId(),"is",$retryCount); $this->logWork($work,"RETRY #".$retryCount); $this->retryCounts[$work->getId()] = $retryCount; $this->moveToQueued($work); } else { //dhGlobal::debug("moveToDone",$work->getId()); $this->moveToDone($work); if(!is_null($message) && is_object($message)) { $work->error($message->getErrorObject()); } else { $work->error(new Result([],$work->getId(),true)); } } } private function moveTo(Work $work,&$destinationArray) { $this->lastActivity = microtime(true); if(!isset($this->work[$work->getId()])) { //throw new \Exception("Work not found in work array"); dhGlobal::error("[work]",$work->getId(),"Not found in work array, unable to move to other array"); return; } unset($this->queued[$work->getId()]); unset($this->active[$work->getId()]); unset($this->done[$work->getId()]); $destinationArray[$work->getId()] = &$this->work[$work->getId()]; if(!$this->hasActive() && !$this->hasQueued()) { if(!is_null($this->deferred)) { //we need to resolve the QueuePromise after the final 'work' promise, otherwise we will resolve the QueuePromise before the WorkPromise is processed. $work->always(function() { if(!$this->hasQueued()) { $this->deferred->resolve($this->done); } }); } } $this->updateProgress(); return $work; } private function log(...$args) { array_unshift($args,"[QUEUE ]"); $this->_log(...$args); } private function logWork(Work $work,...$args) { array_unshift($args,"[ WORK ]",":".$work->getId().":"); $this->_log(...$args); } private function _log(...$args) { if($this->get("log",false) !== false) { if(is_null($this->logger)) { $this->logger = new dhOut(false,$this->get("log")); } $this->logger->line(...$args); } } public static function callableWork($callable,...$args) { $work = new Work($callable); $work->args($args); $work->asJson(false); return $work; } private function threadStatusDisplay() { $this->touchLastActivityTime(); $templateFormat = "{id} | {lastTime|since|timeFormat|pad:8} | Mem:{memUsed|formatBytes} / done:{workDone|pad:3} | {description|max:100}"; $templateFormat = $this->get("summaryTemplate",$templateFormat); dhGlobal::outLine("[THREAD STATUS UPDATE]"); foreach($this->queueWorkers as $queueWorker) { $line = $this->template->parse($templateFormat,$queueWorker->get()); dhGlobal::outLine($line); //dhGlobal::outLine($queueWorker->get()); } dhGlobal::outLine("[/THREAD STATUS UPDATE]"); } private function idleUpdate() { $status = $this->status(); $templateFormat = "Queued: {queued}, Active: {active} | Workers: {workers} total, {activeWorkers} active"; $templateFormat = $this->get("idleUpdateTemplate",$templateFormat); dhGlobal::outLine($this->template->parse($templateFormat,$status)); } private function setupTimers() { if(is_null($this->timerProgress)) { $this->timerProgress = Loop::addPeriodicTimer(0.5,function($t) { $this->updateProgress(); }); } if(is_null($this->timerThreadUpdate)) { $this->timerThreadUpdate = Loop::addPeriodicTimer($this->get("threadStatusInterval",60),function($t) { if(!$this->isDone && $this->get("visualize",false)) { $this->threadStatusDisplay(); } }); } } private function stopTimers() { if(!is_null($this->timerProgress)) { Loop::cancelTimer($this->timerProgress); } if(!is_null($this->timerThreadUpdate)) { Loop::cancelTimer($this->timerThreadUpdate); } } public function queueWorkerOnEnabled(QueueWorker &$queueWorker) { unset($this->queueWorkersDisabled[$queueWorker->getId()]); $this->queueWorkersEnabled[$queueWorker->getId()] = &$queueWorker; } public function queueWorkerOnActivated(QueueWorker &$queueWorker) { $this->queueWorkersActive[$queueWorker->getId()] = &$queueWorker; } public function queueWorkerOnDisabled(QueueWorker &$queueWorker) { unset($this->queueWorkersEnabled[$queueWorker->getId()]); //$this->queueWorkersDisabled[$queueWorker->getId()] = &$queueWorker; } public function queueWorkerOnDeactivated(QueueWorker &$queueWorker) { unset($this->queueWorkersActive[$queueWorker->getId()]); } /** * Get the value of expected * @return int */ public function getExpected() { return $this->expected; } /** * Set the value of expected * @param int $expected * @return self */ public function setExpected($expected) { $this->expected = $expected; return $this; } /** * Get the overall config * @param mixed $config * @return Config|$this */ public function config($config=null) { if(is_null($config)) { return $this->config; } else { $this->config = $config; return $this; } } public function getQueueConfig() { return $this->config; } public function get($key,$default=null) { return $this->config->get($key,$default); } public function set($key,$value) { $this->config->set($key,$value); return $this; } }src/queue/QueueLogger.php000064400000021772144761607050011437 0ustar00"QQ-", "worker"=>"QW-", "process"=>"WP-", "client"=>"WC-", ]; public static function setConfig($config) { static::$config = $config; } public static function getConfig() { return static::$config; } public static function initSettings($logDir=null,$logPerWorker=null) { if(!is_null($logDir) && !empty($logDir)) { static::logDir($logDir); } if(!is_null($logPerWorker)) { static::logPerWorker($logPerWorker); } if(!static::logFilePrefix()) { static::logFilePrefix(date("YmdHis")); } static::initLogDir(); } /** * @param QueueWorker|WorkerProcess|\Worker|Queue $object * @return void */ public static function setObject($object,$logDir=null,$logPerWorker=null) { static::$object = $object; static::initSettings($logDir,$logPerWorker); if($object instanceof \Worker) { } else { static::setConfig($object->getQueueConfig()); static::setLogFileByObject($object); } } public static function initClient($clientId=null,$logFile=null) { static::setConfig(new Config()); if(!empty($logFile)) { static::clientId($clientId); static::$logFiles[$clientId]["file"] = $logFile; static::$logFiles[$clientId]["error"] = str_replace(".log","_error.log",$logFile); static::$logDir = true; ini_set("log_errors",true); ini_set("dispaly_errors","off"); ini_set('error_log', static::getErrorLogFile($clientId)); dhGlobal::logger("debugger",dhGlobal::LOG_ALL,false,static::getLogFile($clientId)); dhGlobal::set("out",new dhOut(false,static::getLogFile($clientId),true,"date:H:i:s"," ($clientId:stdout) ")); dhGlobal::set("asyncLog",true); } } public static function initLogDir() { if(is_null(static::$logDir)) { return false; } elseif(static::$logDir === true) { return true; } if(!is_dir(static::$logDir)) { exec("mkdir -p ".static::$logDir); } } public static function logPerWorker($logPerWorker=null) { if(!is_null($logPerWorker)) { static::$logPerWorker = $logPerWorker; } return static::$logPerWorker; } public static function logDir($logDir=null) { if(!is_null($logDir)) { static::$logDir = $logDir; } return static::$logDir; } public static function clientId($clientId=null) { if(!is_null($clientId)) { static::$clientId = $clientId; } return static::$clientId; } public static function logFilePrefix($logFilePrefix=null) { if(!is_null($logFilePrefix)) { static::$logFilePrefix = $logFilePrefix; } return static::$logFilePrefix; } /** * * @param mixed $level * @param mixed $args * @return false|void */ public static function log($object,$level,...$args) { if($object instanceof \Worker) { $logArgs = $args; array_shift($logArgs); $object->sendLogMessage(implode(" ",$logArgs),$level); } if(!static::$logDir && !static::$clientId) { return false; } $fileName = static::getLogFile($object); if($level == "error") { $fileName = static::getErrorLogFile($object); } $level = "|".dhGlobal::padLeft(strtoupper($level),7," ")." | "; array_unshift($args,$level); $line = static::argsToLine($args); file_put_contents($fileName,$line."\n",FILE_APPEND); return true; } public static function debug($what,...$args) { if(!static::$config->get("logLevels.debug",false)) { return false; } array_unshift($args,static::getPrefix($what)); return static::log($what,"debug",...$args); } public static function msgSend($what,...$args) { if(!static::$config->get("logLevels.comms",false)) { return false; } array_unshift($args," --> "); array_unshift($args,static::getPrefix($what)); return static::log($what,"comms",...$args); } public static function msgRecv($what,...$args) { if(!static::$config->get("logLevels.comms",false)) { return false; } array_unshift($args," <-- "); array_unshift($args,static::getPrefix($what)); return static::log($what,"comms",...$args); } public static function info($what,...$args) { if(!static::$config->get("logLevels.info",false)) { return false; } array_unshift($args,static::getPrefix($what)); return static::log($what,"info",...$args); } public static function notice($what,...$args) { if(!static::$config->get("logLevels.notice",false)) { return false; } array_unshift($args,static::getPrefix($what)); return static::log($what,"notice",...$args); } public static function warning($what,...$args) { if(!static::$config->get("logLevels.warning",false)) { return false; } array_unshift($args,static::getPrefix($what)); return static::log($what,"warning",...$args); } public static function error($what,...$args) { if(!static::$config->get("logLevels.error",false)) { return false; } array_unshift($args,static::getPrefix($what)); return static::log($what,"error",...$args); } public static function getPrefix($what) { if($what instanceof Queue) { return "QQ-".$what->id(); } if($what instanceof QueueWorker) { return "QW-".$what->getId();; } if($what instanceof WorkerProcess) { return "WP-".$what->id(); } if($what instanceof \Worker) { return "WC-".$what->id(); } if(!is_object($what)) { if(is_array($what)) { return implode(" ",$what); } return isset(static::$logPrefix[$what]) ? static::$logPrefix[$what] : $what; } } public static function getLogFile($id) { if(is_object($id)) { $id = $id->id(); } if(isset(static::$logFiles[$id])) { return static::$logFiles[$id]["file"]; } return false; } public static function getErrorLogfile($id) { if(is_object($id)) { $id = $id->id(); } if(isset(static::$logFiles[$id])) { return static::$logFiles[$id]["error"]; } return false; } public static function setLogFileByObject($what) { $filePrefix = static::$logDir.DIRECTORY_SEPARATOR.static::logFilePrefix()."_"; if($what instanceof Queue) { $workerLog = $filePrefix.$what->id().".log"; $errorLog = $filePrefix.$what->id()."_error.log"; } else { if(static::logPerWorker()) { $workerLog = $filePrefix.$what->queueId()."_".$what->queueWorkerId().".log"; $errorLog = $filePrefix.$what->queueId()."_".$what->queueWorkerId()."_error.log"; } else { $workerLog = $filePrefix.$what->queueId()."_worker.log"; $errorLog = $filePrefix.$what->queueId()."_worker_error.log"; } } static::$logFiles[$what->id()]["file"] = $workerLog; static::$logFiles[$what->id()]["error"] = $errorLog; if($what instanceof QueueWorker) { static::$logFiles["default"] = static::$logFiles[$what->id()]; } } protected static function argsToLine($args) { $line = date("H:i:s")." "; $parts = []; foreach($args as $arg) { $parts[] = static::argToString($arg)." "; } return $line.implode(" ",$parts); } protected static function argToString($arg) { if(is_array($arg) || is_object($arg)) { return print_r($arg,true); } else { return $arg; } } }src/queue/QueueStatus.php000064400000007532144761607050011501 0ustar00setThreads($threads); } $this->bar = new Bar($this->barFormat); $this->bar->set("name",$name); $this->updateBar(); $this->statusBar = new StatusBar(0.5,$this->bar); $this->statusBar->addBar($name,$this->bar); } public function getBar() { return $this->statusBar; } public function setThreadData($queueWorkers) { $this->threadData = []; foreach($queueWorkers as $queueWorker) { $this->threadData[] = $queueWorker->get(); } } public function updateThread($threadNumber,$key,$value) { $this->threadData[$threadNumber][$key] = $value; } public function update($done,$active=0,$queued=0,$expected=0) { if(is_null($this->timeStarted)) { $this->statusBar->start(); $this->timeStarted = microtime(true); } if($expected==0) { $expected = $done+$queued+$active; } $this->total = $expected; $this->remaining = $this->total - $done; $this->active = $active; $this->queued = $queued; if($done>0) { $this->percent = ($done / $this->total) * 100; } else { $this->percent = 0; } $this->updateBar(); } public function stop() { $this->statusBar->stop(); } public function draw() { $this->statusBar->draw(); } private function updateBar() { $this->bar->set("percent", number_format($this->percent,2)); $this->bar->set("remaining", $this->remaining); $this->bar->set("total", $this->total); $this->bar->set("active", $this->active); $this->bar->set("queued", $this->queued); $this->bar->set("threadData", $this->threadData); } public function updateInterval($updateInterval) { $this->statusBar->updateInterval($updateInterval); return $this; } public function showExec($showExec) { $this->statusBar->showExec($showExec); return $this; } public function showMem($showExec) { $this->statusBar->showMem($showExec); return $this; } public function setBarFormat($barFormat) { $this->barFormat = $barFormat; $this->bar->barFormat($this->barFormat); } public function setThreads($threads,$threadTemplate=null) { $this->threadData = array_fill(0,$threads,["lastTime"=>null,"status"=>"","done"=>0,"mem"=>0]); if(is_null($threadTemplate)) { $this->bar->set("threadDataTemplate","[{lastTime|since|timeFormat} {workDone} {status|max:10|pad:10}]"); } else { $this->bar->set("threadDataTemplate",$threadTemplate); } if(!$this->threadsAdded) { $this->barFormat .= " {threadData}"; $this->threadsAdded=true; $this->bar->barFormat($this->barFormat); } } }src/queue/QueueWorker.php000064400000037431144761607050011470 0ustar00null, "bootstrapFile"=>null, "bootstrapCallable"=>null, "timeout"=>300, "maxWork"=>0, "logDir"=>null, "lastTime"=>null, "workDone"=>0, "pingTime"=>null, "status"=>"", "memUsed"=>0, "enabled"=>true, "disabled"=>false, "active"=>false, "failures"=>0, "reassigns"=>0, "description"=>"" ]; public function __destruct() { if(!is_null($this->timer)) { Loop::cancelTimer($this->timer); } } public function __construct(Queue $queue) { $this->id = uniqid(); $this->set("id",$this->id); $this->queue = $queue; $this->set("bootstrapFile" , $this->getConfig("bootstrapFile")); $this->set("bootstrapCallable" , $this->getConfig("bootstrapCallable")); $this->set("timeout" , $this->getConfig("timeout")); $this->set("maxWork" , $this->getConfig("maxWork")); $this->set("logDir" , $this->getConfig("logDir")); $this->setLogging($this->get("logDir",false),$this->getConfig("logPerWorker")); } public function enable() { $this->set("enabled",true); $this->set("disabled",false); $this->queue->queueWorkerOnEnabled($this); $this->launch(); Loop::addPeriodicTimer(0.1,function($timer) { $this->timer = $timer; $this->check(); if(!$this->get("enabled",false) && !$this->get("active",false)) { Loop::cancelTimer($this->timer); $this->timer = null; } }); return $this; } public function disable() { $this->set("enabled","false"); $this->set("disabled",true); $this->stopWorkerProcess(); $this->queue->queueWorkerOnDisabled($this); $this->queue->queueWorkerOnDeactivated($this); return $this; } private function check() { //dhGlobal::outLine("[QW] check.."); if(!is_null($this->workerProcess)) { if(!$this->workerProcess->isRunning() || $this->workerProcess->isDone()) { $this->workerProcess = null; } else { $timeout = $this->get("timeout",0); if(!is_null($this->work) && ($workTimeout = $this->work->getMetaData("timeout",null)) !== null) { $timeout = $workTimeout; } if($timeout>0 && $timeout< (microtime(true) - $this->get("lastTime"))) { $this->logProcessError("timeout reached.. closing worker after timeout value of ",$timeout,"seconds elapsed"); if(!is_null($this->work)) { $this->queue->workOnError($this->work,null,false); } $this->disable(); } else { /*$pingDiff = microtime(true) - max($this->get("pingTime",0),$this->lastPing); if($pingDiff>5) { $this->sendPing(); }*/ } } } if(is_null($this->workerProcess) && $this->get("enabled",false)) { if($this->queue->hasQueued()) { $this->launch(); } } } private function launch($force=false) { if(!is_null($this->workerProcess) && $force) { $this->stopWorkerProcess(); } elseif(!is_null($this->workerProcess)) { return false; } $this->workerProcess = WorkerProcess::forQueue($this,$this->queue); QueueLogger::setObject($this->workerProcess); $this->workerProcess->start(); $this->set("lastTime",microtime(true)); $this->set("workDone",0); } private function stopWorkerProcess() { if(!is_null($this->workerProcess)) { if($this->workerProcess instanceof WorkerProcess) { $this->workerProcess->then(function($workerProcess) { $this->logProcessInfo("stopped"); },function($workerProcess) { $this->logProcessInfo("stopped"); }); } $this->workerProcess->terminate(); } if(!is_null($this->work)) { $this->stoppedWork = $this->work; } $this->workId = null; $this->work = null; $this->set("active",false); $this->queue->queueWorkerOnDeactivated($this); $this->workerProcess = null; } public function handleInit(Message $message) { $this->logProcessDebug("initialized"); $this->set("lastTime",microtime(true)); $this->setDisplay(""); $this->set("memUsed",$message->mem()); } public function handleAck(Message $message) { $this->logProcessDebug("acknowledged"); $this->set("lastTime",microtime(true)); $this->set("memUsed",$message->mem()); } public function handlePong(Message $message) { $this->logProcessDebug("pong"); $this->set("pingTime",microtime(true)); $this->set("memUsed",$message->mem()); $this->queue->touchLastActivityTime(); } public function handleLog(Message $message) { $this->set("lastTime",microtime(true)); $this->set("memUsed",$message->mem()); $this->callOnLog($this->workerProcess,$message); } public function handleReady(Message $message) { $this->set("lastTime",microtime(true)); $this->setDisplay(""); $this->set("memUsed",$message->mem()); if($this->get("maxWork",0)>0 && $this->get("workDone",0) >= $this->get("maxWork",0)) { $this->logInfo("maxWork exceeded, relaunching"); $this->launch(true); } else { if(!is_null($this->workId)) { $this->set("reassigns",$this->get("reassigns",0)+1); if($this->get("reassigns",0)>3) { $this->queue->workOnError($this->work,null,false); $this->logError("got the same work 3 times","workId:".$this->workId); $this->work=null; $this->workId=null; $this->sendNoWork(); } else { $this->logDebug("re-assigedWork",$this->work->getId()); $this->sendWork($this->work); } } elseif(($work = $this->queue->next()) !== false) { $this->logDebug("assinged work",$work->getId()); $this->work = $work; $this->workId = $work->getId(); $this->setDisplay();//$work->getMetaData("name",$work->getId())); $this->sendWork($this->work); $this->set("reassigns",0); } else { $this->logInfo("noWork, disabling"); $this->sendNoWork(); $this->disable(); } } } public function handleWork(Message $message) { $this->set("lastTime",microtime(true)); $this->set("memUsed",$message->mem()); if($message->isError()) { $this->queue->workOnError($this->work,$message,false); $this->work = null; $this->workId = null; //if killOnError... } else { $this->logProcessInfo("done with",$this->work->getId()); $this->queue->workOnDone($this->work,$message); $this->set("workDone",$this->get("workDone")+1); $this->work = null; $this->workId = null; } } //response handling from the WorkerProcess: public function onStart() { $this->logProcessInfo("started"); if(!is_null($this->get("bootstrapCallable",null))) { $this->sendBootstrap(); } $this->set("active",true); $this->queue->queueWorkerOnActivated($this); } public function onError($chunk=null) { if(!is_null($chunk)) { $this->logProcessError($this->workerProcess,"error:",$chunk); } else { $this->logProcessError($this->workerProcess,"error"); } } public function onDone($exitCode=null,$termSignal=null) { if(is_null($this->workerProcess)) { $workerId = "unknown"; } else { $workerId = $this->workerProcess->getId(); } dhGlobal::trace("[worker]",$workerId,"worker process exited"); $this->logProcessInfo("exited"); $this->setDisplay(""); $this->set("active",false); $this->queue->queueWorkerOnDeactivated($this); } public function onData($chunk=null) { if(($message = Message::fromPacket($chunk)) !== false) { if($message->getWorkId() != "log") { $this->logMsgRecv($message->toPacket()); } if($message->getWorkId() == "init") { $this->logProcessDebug("received",$message->getWorkId()); $this->logProcessDebug("messagePacket:",$message->toPacket()); $this->handleInit($message); } elseif($message->getWorkId() == "ack") { $this->logProcessDebug("received",$message->getWorkId()); $this->logProcessDebug("messagePacket:",$message->toPacket()); $this->handleAck($message); } elseif($message->getWorkId() == "pong") { $this->logProcessDebug("received",$message->getWorkId()); $this->logProcessDebug("messagePacket:",$message->toPacket()); $this->handlePong($message); } elseif($message->getWorkId() == "ready") { $this->logProcessDebug("received",$message->getWorkId()); $this->logProcessDebug("messagePacket:",$message->toPacket()); $this->handleReady($message); } elseif($message->getWorkId() == "log") { $this->handleLog($message); } elseif($message->getWorkId() == $this->workId) { $this->logProcessDebug("received",$message->getWorkId()); $this->logProcessDebug("messagePacket:",$message->toPacket()); $this->handleWork($message); } } } public function get($key=null,$default=null) { if(is_null($key)) { return $this->data; } return dhGlobal::getVal($this->data,$key,$default); } public function set($key,$value=null) { dhGlobal::dotAssign($this->data,$key,$value); return $this; } public function sendMessage($workId=null,$data=null) { $message = new Message(); $message->setWorkId($workId); $message->setData($data); dhGlobal::trace("sent","work",$message->toPacket()); $this->logDebug("sendMessage",$this->workerProcess->getId()," :: ",$message->getWorkId()); $this->logDebug("messagePacket:",$message->toPacket()); $this->logMsgSend($message->toPacket()); $this->workerProcess->write($message->toPacket()."\n"); } public function sendWork(Work $work) { $this->sendMessage($work->getId(),$work->getWork()); } public function sendNoWork() { $this->sendMessage("nowork",["nowork"=>true,"time"=>microtime(true)]); } public function sendPing() { $this->lastPing = microtime(true); $this->sendMessage("ping",["ping"=>true,"time"=>microtime(true)]); } public function sendBootstrap() { $this->sendMessage("init",["callable"=>["static","bootstrap"],"args"=>[$this->getConfig("bootstrapCallable")]]); } private function logProcessInfo(...$args) { QueueLogger::info($this,...$args); } private function logProcessError(...$args) { $message = new Message(); $message->setWorkId("log"); $message->setData(["level"=>"error","message"=>implode(" ",$args)]); $this->callOnLog($this->workerProcess,$message); QueueLogger::error($this,...$args); } private function logProcessDebug(...$args) { QueueLogger::debug($this,...$args); } private function logInfo(...$args) { QueueLogger::info($this,...$args); } private function logError(...$args) { QueueLogger::error($this,...$args); } private function logDebug(...$args) { QueueLogger::debug($this,...$args); } private function logMsgSend(...$args) { QueueLogger::msgSend($this,...$args); } private function logMsgRecv(...$args) { QueueLogger::msgRecv($this,...$args); } private function setLogging($logDir=null,$logPerWorker=false) { QueueLogger::setObject($this,$logDir,$logPerWorker); } /** * Get the queueId */ public function getQueueId() { return $this->queue->id(); } public function queueId() { return $this->getQueueId(); } public function queueWorkerId() { return $this->id(); } public function getStoppedWork() { return $this->stoppedWork; } /** * Get the value of work */ public function getWork() { return $this->work; } /** * Set the value of work * * @return self */ public function setWork($work) { $this->work = $work; return $this; } /** * Get the value of workId */ public function getWorkId() { return $this->workId; } /** * Set the value of workId * * @return self */ public function setWorkId($workId) { $this->workId = $workId; return $this; } /** * Get the value of id */ public function getId() { return $this->id; } public function id() { return $this->getId(); } /** * Set the value of id * * @return self */ public function setId($id) { $this->id = $id; return $this; } public function setDisplay($value=null) { $staticValues = ["","",""]; if(is_null($value) && !is_null($this->work)) { $description = $this->work->getMetaData("description",""); $value = $this->work->getMetaData("name",$this->workId); $this->set("status",$value); $this->set("description",$description); } else { $this->set("status",$value); $this->set("description",$value); } } public function getQueueConfig() { return $this->queue->getQueueConfig(); } public function getConfig($key=null,$default=null) { return $this->queue->get($key,$default); } public function setConfig($key,$value=null) { $this->queue->set($key,$value); return $this; } public function callOnLog($process,$message) { $callable = $this->getConfig("onLog"); if(is_callable($callable)) { $callable($process,$message); } elseif(is_null($callable)) { if($message->get("level") == "error") { dhGlobal::outLine("****",strtoupper($message->get("level")),"|","WC-".$process->getId(),"|",$message->get("message")); } } } }src/traits/TaskCallbacks.php000064400000011477144761607050012100 0ustar00addCallback($this->onStartCallback,$onStart); $this->addCallback($this->onQueueCallback,$onQueue); $this->addCallback($this->onDoneCallback,$onDone); $this->addCallback($this->onErrorCallback,$onError); $this->addCallback($this->onFinalCallback,$onFinal); return $this; } public function then($onDone=null,$onError=null) { $this->addCallback($this->onDoneCallback,$onDone); $this->addCallback($this->onErrorCallback,$onError); return $this; } public function lastly($onFinal=null) { $this->addCallback($this->onFinalCallback,$onFinal); } public function onStart($onStart=null) { $this->addCallback($this->onStartCallback,$onStart); return $this; } public function onQueue($onQueue=null) { $this->addCallback($this->onQueueCallback,$onQueue); return $this; } public function onDone($onDone=null) { $this->addCallback($this->onDoneCallback,$onDone); return $this; } public function onError($onError=null) { $this->addCallback($this->onErrorCallback,$onError); return $this; } private function callbackOnDone($result) { if(is_object($result) && $result instanceof Result) { $this->result = $result; $this->data = $result->result(); $this->error = $result->error(); } else { $this->data = $result; } $this->runCallbacks($this->onDoneCallback,$this->data,$this); if($this->showDone) { if(!$this->showDetailed) { static::display("done",$this->displayName()); } else { static::display("done",$this->displayName(),":",$this->data); } } $this->runCallbacks($this->onFinalCallback,$this->data,$this); $this->cleanupWork(); } private function callbackOnError($result) { if(is_object($result)) { if($result instanceof ExceptionResult || $result instanceof \Exception) { $this->error = $result; } elseif($result instanceof Result) { $this->result = $result; $this->data = $result->result(); $this->error = $result->error(); } elseif(method_exists($result,"getMessage")) { $this->error = $result->getMessage(); } } else { $this->error = $result; } if(!TaskQueue::get("errorExceptions",false) && $this->error instanceof \Exception) { $this->error = $this->error->getMessage(); } if(!empty($this->onErrorCallback)) { $this->runCallbacks($this->onErrorCallback,$this->error,$this); } elseif(!empty($this->onDoneCallback)) { $this->runCallbacks($this->onDoneCallback,null,$this); } if($this->showDone) { if(!$this->showDetailed) { static::display("error",$this->displayName()); } else { static::display("error",$this->displayName(),":",$this->error); } } $this->cleanupWork(); } private function callbackOnStart() { if($this->showStart) { static::display("start",$this->displayName()); } $this->runCallbacks($this->onStartCallback,$this); } private function callbackOnQueue() { if($this->showQueued) { static::display("queued",$this->displayName()); } $this->runCallbacks($this->onQueueCallback,$this); } private function runCallbacks($callback,...$args) { if(is_null($callback) || empty($callback)) { return; } if(is_array($callback)) { foreach($callback as $cb) { $cb(...$args); } } else { $callback(...$args); } } public function cleanupWork($force=false) { if(property_exists($this,"cleanupOnDone") && !is_null($this->cleanupOnDone) && $this->cleanupOnDone) { if(!is_null($this->work) && $this->work instanceof Work) { $this->work->destroy(); } $this->result = $this->error = $this->data = null; } } }src/traits/TaskDisplay.php000064400000005475144761607050011627 0ustar00 queued :"; public static $startPrefix = "> --- start :"; public static $donePrefix = "> *** done :"; public static $errorPrefix = "> !!! error :"; public static function display($action,$identifier,...$args) { $displayLen = stdOut::getCols(60); if($displayLen<20) { $displayLen=20; } $prefix = ""; if(in_array($action,static::$doneVerbs)) { $prefix = static::$donePrefix; } elseif(in_array($action,static::$startVerbs)) { $prefix = static::$startPrefix; } elseif(in_array($action,static::$queueVerbs)) { $prefix = static::$queuePrefix; } elseif(in_array($action,static::$errorVerbs)) { $prefix = static::$errorPrefix; } $argString = ""; if(!empty($args)) { foreach($args as $k=>$arg) { $args[$k] = StdOut::argToString($arg,true); } $argString = implode(" ",$args); if(strlen($argString) >= $displayLen) { $argString=substr($argString,0,$displayLen-4)." ..."; } } dhGlobal::outLine($prefix,$identifier,$argString); } public static function addStartVerb($verb) { static::$startVerbs[] = $verb; } public static function addQueueVerb($verb) { static::$queueVerbs[] = $verb; } public static function addDoneVerb($verb) { static::$doneVerbs[] = $verb; } public static function addErrorVerb($verb) { static::$errorVerbs[] = $verb; } public static function setStartPrefix($prefix) { static::$startPrefix = $prefix; } public static function setQueuePrefix($prefix) { static::$queuePrefix = $prefix; } public static function setDonePrefix($prefix) { static::$donePrefix = $prefix; } public static function setErrorPrefix($prefix) { static::$errorPrefix = $prefix; } public static function setQueueVerbs($verbs) { static::$queueVerbs = $verbs; } public static function setStartVerbs($verbs) { static::$startVerbs = $verbs; } public static function setDoneVerbs($verbs) { static::$doneVerbs = $verbs; } public static function setErrorVerbs($verbs) { static::$errorVerbs = $verbs; } }src/traits/TaskGettersSetters.php000064400000012251144761607050013177 0ustar00getOnDoneCallback() : $this->setOnDoneCallback($onDoneCallback); } public function onErrorCallback($onErrorCallback=null) { return is_null($onErrorCallback) ? $this->getOnErrorCallback() : $this->setOnErrorCallback($onErrorCallback); } public function onStartCallback($onStartCallback=null) { return is_null($onStartCallback) ? $this->getOnStartCallback() : $this->setOnStartCallback($onStartCallback); } public function onQueueCallback($onQueueCallback=null) { return is_null($onQueueCallback) ? $this->getOnQueueCallback() : $this->setOnQueueCallback($onQueueCallback); } /** * @param mixed $showDone * @return bool|self */ public function showDone($showDone=null) { return is_null($showDone) ? $this->getShowDone() : $this->setShowDone($showDone); } /** * @param mixed $showDone * @return bool|self */ public function showStart($showStart=null) { return is_null($showStart) ? $this->getShowStart() : $this->setShowStart($showStart); } /** * @param mixed $showDone * @return bool|self */ public function showQueued($showQueued=null) { return is_null($showQueued) ? $this->getShowQueued() : $this->setShowQueued($showQueued); } /** * @param mixed $showDone * @return bool|self */ public function showDetailed($showDetailed=null) { return is_null($showDetailed) ? $this->getShowDetailed() : $this->setShowDetailed($showDetailed); } public function setVerbosity($start=false,$queue=false,$done=false,$detail=false) { $this->showStart($start); $this->showQueued($queue); $this->showDone($done); $this->showDetailed($detail); } /** * Get the value of onDoneCallback * @return mixed */ public function getOnDoneCallback() { return $this->onDoneCallback; } /** * Set the value of onDoneCallback * @param mixed $onDoneCallback * @return self */ public function setOnDoneCallback($onDoneCallback) { $this->onDoneCallback = $onDoneCallback; return $this; } /** * Get the value of onErrorCallback * @return mixed */ public function getOnErrorCallback() { return $this->onErrorCallback; } /** * Set the value of onErrorCallback * @param mixed $onErrorCallback * @return self */ public function setOnErrorCallback($onErrorCallback) { $this->onErrorCallback = $onErrorCallback; return $this; } /** * Get the value of onStartCallback * @return mixed */ public function getOnStartCallback() { return $this->onStartCallback; } /** * Set the value of onStartCallback * @param mixed $onStartCallback * @return self */ public function setOnStartCallback($onStartCallback) { $this->onStartCallback = $onStartCallback; return $this; } /** * Get the value of onQueueCallback * @return mixed */ public function getOnQueueCallback() { return $this->onQueueCallback; } /** * Set the value of onQueueCallback * @param mixed $onQueueCallback * @return self */ public function setOnQueueCallback($onQueueCallback) { $this->onQueueCallback = $onQueueCallback; return $this; } /** * Get the value of showDone * @return mixed */ public function getShowDone() { return $this->showDone; } /** * Set the value of showDone * @param mixed $showDone * @return self */ public function setShowDone($showDone) { $this->showDone = $showDone; return $this; } /** * Get the value of showStart * @return mixed */ public function getShowStart() { return $this->showStart; } /** * Set the value of showStart * @param mixed $showStart * @return self */ public function setShowStart($showStart) { $this->showStart = $showStart; return $this; } /** * Get the value of showQueued * @return mixed */ public function getShowQueued() { return $this->showQueued; } /** * Set the value of showQueued * @param mixed $showQueued * @return self */ public function setShowQueued($showQueued) { $this->showQueued = $showQueued; return $this; } /** * Get the value of showDetailed * @return mixed */ public function getShowDetailed() { return $this->showDetailed; } /** * Set the value of showDetailed * @param mixed $showDetailed * @return self */ public function setShowDetailed($showDetailed) { $this->showDetailed = $showDetailed; return $this; } }src/work/Work.php000064400000030732144761607050007767 0ustar00id = uniqid(); $this->callable = $serializeableCallable; $this->args = dhGlobal::getVal($options, "args", null); $this->asJson = dhGlobal::getVal($options, "asJson", false); $this->onData = dhGlobal::getVal($options, "onData", null); $this->onError = dhGlobal::getVal($options, "onError", null); $this->onStart = dhGlobal::getVal($options, "onStart", null); $this->onDone = dhGlobal::getVal($options, "onDone", null); $this->bootstrap = dhGlobal::getVal($options, "bootstrap",null); $this->queue = dhGlobal::getVal($options, "queue", null); $this->deferred = dhGlobal::getVal($options, "deferred", null); $this->metaData = dhGlobal::getVal($options, "metaData", null); if(is_null($this->deferred)) { $this->deferred = new Deferred(); } } public function start() { return $this->run(); } public function destroy() { $this->id = null; $this->callable = null; $this->args = null; $this->asJson = null; $this->onData = null; $this->onError = null; $this->onStart = null; $this->onDone = null; $this->bootstrap = null; $this->queue = null; $this->deferred = null; $this->metaData = null; $this->result = null; $this->error = null; } public function getId() { return $this->id; } public function getPriority() { return $this->priority; } public function getResult() { return $this->result; } public function getError() { return $this->error; } public function isDone() { return $this->done; } public function isRunning() { return $this->running; } public function isReady() { return $this->ready; } public function hasSuccess() { return $this->success; } public function isSuccess() { return $this->success; } /** * @param mixed|null $args * @return mixed */ public function args($args=null) { if(is_null($args)) { return $this->args; } $this->args = $args; return $this; } /** * @param bool|null $asJson * @return bool */ public function asJson($asJson=null) { if(is_null($asJson)) { return $this->asJson; } if($asJson) { $asJson = true; } else { $asJson = false; } $this->asJson = $asJson; return $this; } public function done($result) { $this->success=true; $this->onWorkDone($result); } public function error($data=null) { $this->onWorkError($data); } public function terminate() { } public function getWork() { $packet = []; $packet["id"] = $this->id; $packet["callable"] = $this->callable; $packet["args"] = $this->args; $packet["asJson"] = $this->asJson; return $packet; } /** * Wrapper for \React\Promise\Promise::then() * @param callable|null $onFulfilled * @param callable|null $onRejected * @return \React\Promise\PromiseInterface */ public function then(callable $onFulfilled=null,callable $onRejected=null) { return $this->Promise()->then($onFulfilled,$onRejected); } public function always(callable $onFullfilledOrRejected=null) { return $this->Promise()->always($onFullfilledOrRejected); } /** * Making it easy to run work without having to queue.. creates a new queue for each run() call. * * Options * * bootstrap - the bootstrap definition to use with the newly created queue.. ommitted if using existing queue * * queue - optional instance of \boru\dhutils\async\Queue, instead of creating a new queue, re-uses an existing one. * @param array $options * @return $this */ public function run($options=[]) { $this->bootstrap = dhGlobal::getVal($options,"bootstrap",$this->bootstrap); $queue = dhGlobal::getVal($options,"queue",$this->queue); if(is_null($queue)) { $queue = dhGlobal::getAsyncQueue($this->bootstrap); } $queue->queue($this); return $this; } /** Handler Methods */ public function onWorkDone($result=null) { $this->result = $result; $this->done = true; $this->running = false; $this->ready = false; $resultData = $resultError = null; if(is_object($result) && $result instanceof Result) { $resultData = $result->result(); $resultError = $result->error(); } $this->deferred->resolve($this->result); if(!is_null($this->onDone)) { $handler = $this->onDone; $handler($this,$this->result); } } public function onWorkError($chunk=null) { if(!is_null($this->onError)) { $handler = $this->onError; $handler($this,$chunk); } $this->error = $chunk; if(!is_null($this->deferred)) { if($chunk instanceof Result) { if($chunk->isError()) { $e = new ExceptionResult($chunk); } else { $e = new ExceptionResult($chunk); $e->setMessage("Received workError with chunk: ".$chunk); } } else { $e = new \Exception("Received workError with chunk: ".$chunk); } $this->deferred->reject($e); } } public function onWorkData($chunk=null) { if(!is_null($this->onData)) { $handler = $this->onData; $handler($this,$chunk); } } public function onWorkStart() { $this->running = true; $this->ready = false; if(!is_null($this->onStart)) { $handler = $this->onStart; $handler($this); } } public function onWorkQueued() { $this->running = true; $this->ready = false; if(!is_null($this->onQueued)) { $handler = $this->onQueued; $handler($this); } } /** * Get the value of running * @return bool */ public function getRunning() { return $this->running; } /** * Set the value of running * @param bool $running * @return self */ public function setRunning($running) { $this->running = $running; return $this; } /** * Get the value of done * @return bool */ public function getDone() { return $this->done; } /** Set the value of done * @param bool $done * @return self */ public function setDone($done) { $this->done = $done; return $this; } /** * Set the Deferred that will be resolved on completion * @param Deferred $deferred * @return $this */ public function setDeferred(Deferred $deferred) { $this->deferred = $deferred; return $this; } /** * Get the value of deferred * * @return mixed */ public function getDeferred() { return $this->deferred; } /** @return \React\Promise\Promise */ public function Promise() { return $this->deferred->promise(); } /** * Get the value of queue * @return mixed */ public function getQueue() { return $this->queue; } /** * Set the value of queue * @param mixed $queue * @return self */ public function setQueue($queue) { $this->queue = $queue; return $this; } /** * Get the value of bootstrap * @return mixed */ public function getBootstrap() { return $this->bootstrap; } /** * Set the value of bootstrap * @param mixed $bootstrap * @return self */ public function setBootstrap($bootstrap) { $this->bootstrap = $bootstrap; return $this; } public function onStart(callable $callback) { $this->onStart = $callback; return $this; } public function onDone(callable $callback) { $this->onDone = $callback; return $this; } public function onData(callable $callback) { $this->onData = $callback; return $this; } public function onError(callable $callback) { $this->onError = $callback; return $this; } public function onQueued(callable $callback) { $this->onQueued = $callback; return $this; } public function setMetaData($key,$val="",$append=false) { if(strpos($key,".") !== false) { if($append) { $check = dhGlobal::getDot($this->metaData,$key); if(!is_null($check)) { if(is_array($check)) { $check[] = $val; $val = $check; } else { $narr = []; $narr[] = $check; $narr[] = $val; $val = $narr; } } } dhGlobal::dotAssign($this->metaData,$key,$val); } else { if(isset($this->metaData[$key]) && $append) { if(is_array($this->metaData[$key])) { $this->metaData[$key][] = $val; } else { $temp = $this->metaData[$key]; $array[$key] = []; $array[$key][] = $temp; $array[$key][] = $val; } } else { $this->metaData[$key] = $val; } } return $this; } public function getMetaData($key=null,$default=null,$exists=false) { if(is_null($key)) { if($exists) { return !empty($this->metaData) ? true : false; } else { return !empty($this->metaData) ? $this->metaData : $default; } } if(strpos($key,".") !== false) { $uniqueid = uniqid("getArray",true); if(($check = dhGlobal::getDot($this->metaData,$key,$uniqueid)) !== $uniqueid) { return $exists ? true : $check; }; } if($exists) { return isset($this->metaData[$key]); } else { return isset($this->metaData[$key]) ? $this->metaData[$key] : $default; } } public function metaDataExists($key) { return $this->getMetaData($key,null,true); } public static function fromCallable($callable,...$args) { $work = new self($callable); $work->args($args); return $work; } }src/worker/WorkerClient.php000064400000045656144761607050012017 0ustar00$v) { cwSetIfArg("log",$v,$logDir); cwSetIfArg("id",$v,$parentId); cwSetIfArg("qid",$v,$queueId); cwSetIfArg("wid",$v,$workerId); cwSetIfArg("ll",$v,$logSettings); cwPacketOrInclude($v,$preloadPackets,$includes); if($v == "debug") { $maxWaitBeforeExit=0; } } } if(!empty($includes)) { foreach($includes as $include) { if(file_exists($include)) { require_once $include; } } } $pparts = explode(DIRECTORY_SEPARATOR,__DIR__); $pparts = array_reverse($pparts); if($pparts[2] == "dhprocess" && $pparts[4] == "vendor") { require_once __DIR__."/../../../../autoload.php"; } else { require_once __DIR__."/../../vendor/autoload.php"; } /** * I understand completely that we shouldn't throw random ini_set definitions within libraries, but centos does weird things. */ if(php_sapi_name() == "cli") { ini_set("memory_limit",-1); } if(is_null($parentId)) $parentId = uniqid(); //if(!is_null($logDir)) { QueueLogger::initClient($parentId,$logDir); //} QueueLogger::getConfig()->logLevelsFromTransport(empty($logSettings) ? "" : $logSettings); $utilWorker = new Worker($preloadPackets,$maxWaitBeforeExit,$parentId,$workerId,$queueId); /** * @name WorkerClient */ class Worker { private $id; private $delay = 0.1; private $maxWaitBeforeExit = 5; public static $bootstrap; private $running = false; private $checking = false; private $terminated = false; private $queueId = null; private $queueWorkerId = null; private $readyLastSent; /** @var \React\Stream\ReadableResourceStream */ private $stdin; /** @var \React\Stream\WritableResourceStream */ private $stdout,$stderr; private $partialLine = ""; private $buffersInit = false; public function __construct($preloadPackets=null,$maxWaitBeforeExit=5,$parentId=null,$queueWorkerId=null,$queueId=null) { $this->id = !is_null($parentId) ? $parentId : uniqid(); QueueLogger::setObject($this); $this->maxWaitBeforeExit = $maxWaitBeforeExit; $this->queueWorkerId = $queueWorkerId; $this->queueId = $queueId; stream_set_blocking(STDIN, 0); if(!empty($preloadPackets) && is_array($preloadPackets)) { foreach($preloadPackets as $packet) { $this->processPacket($packet); } } if(($ml = dhGlobal::get("asyncMaxLogResponseLength",false)) !== false) { $this->maxLogResponseLength = $ml; } $this->stdin = new ReadableResourceStream(STDIN); $this->stdout = new WritableResourceStream(STDOUT); $this->stderr = new WritableResourceStream(STDERR); $this->stdin ->on("data", function($chunk) { $this->onInput("stdin",$chunk); }); $this->stdout ->on("data", function($chunk) { $this->onInput("stdout",$chunk); }); $this->stderr ->on("data", function($chunk) { $this->onInput("stderr",$chunk); }); $this->stdin ->on("close",function() { $this->onClose("stdin"); }); $this->stdout ->on("close",function() { $this->onClose("stdout"); }); $this->stderr ->on("close",function() { $this->onClose("stderr"); }); $this->stdin ->on("error",function(\Exception $e) { $this->onError("stdin",$e); }); $this->stdout ->on("error",function(\Exception $e) { $this->onError("stdout",$e); }); $this->stderr ->on("error",function(\Exception $e) { $this->onError("stderr",$e); }); Loop::addPeriodicTimer($this->delay,function($timer) { if(!$this->running && !$this->checking) { $this->sendMessage($this->makeReadyMessage(),false); $this->checking=true; $this->readyLastSent = microtime(true); } if($this->maxWaitBeforeExit>0 && $this->checking && !is_null($this->readyLastSent) && microtime(true)-$this->readyLastSent>$this->maxWaitBeforeExit) { $elapsed = microtime(true)-$this->readyLastSent; $this->sendMessage($this->responseMessage("info",["what"=>"terminating","why"=>round($elapsed,2)." seconds elapsed since 'ready' was sent, exceeds limit of ".$this->maxWaitBeforeExit." seconds"]),false); $this->terminate(false); } }); $this->buffersInit = true; $this->logInfo("init complete, started loop"); } public function __destruct() { if(!$this->terminated) { $this->logInfo("process exited abruptly",debug_backtrace()); } $this->logInfo("process exited"); } private function onInput($streamType,$chunk) { if($streamType == "stdin") { if(substr($chunk,-1) != "\n") { $this->partialLine.=$chunk; } else { $this->logMsgRecv(trim($this->partialLine.$chunk)); $data = explode("\n",$this->partialLine.$chunk); $this->partialLine=""; foreach($data as $line) { $this->processPacket($line); } } } else { $this->logInfo("[".$streamType."]",trim($chunk)); } } private function onClose($streamType) { $this->logInfo("[".$streamType."]","---CLOSED---"); } private function onError($streamType,\Exception $e) { $this->logInfo("[".$streamType."]","---ERROR---",$e->getMessage()); } public function processPacket($framedPacket) { if(($message = Message::fromPacket($framedPacket)) !== false) { $this->logDebug("processPacket: $framedPacket"); $this->handlePacketMessage($message)->then(function($message) { $this->sendMessage($message); },function($message) { $this->sendMessage($message); }); } else { if(!empty(trim($framedPacket))) { $this->logError("unable to parse packet: $framedPacket"); } } } private function handlePacketMessage($message) { $packetDeferred = new Deferred(); $this->running=true; if(!$message->isValid()) { $packetDeferred->reject($this->makeErrorMessage(0,Message::E_FRAME_ERROR,"unable to parse message frame")); } else { $workId = $message->getWorkId(); $this->sendMessage($this->makeAckMessage($workId),false); if($workId == "nowork"){ $this->processNoWork($workId,$message,$packetDeferred); } elseif($workId == "ping") { $this->processPing($workId,$message,$packetDeferred); } elseif($message->get("callable",false) !== false) { $this->processCallable($workId,$message,$packetDeferred); } else { $this->checking=false; $packetDeferred->reject($this->makeErrorMessage($workId,Message::E_UNKNOWN_FRAME,$message->get())); } } return $packetDeferred->promise(); } private function processPing($workId,$message,&$packetDeferred) { $packetDeferred->resolve($this->makePongMessage()); } private function processNoWork($workId,$message,&$packetDeferred) { $this->checking=false; $packetDeferred->resolve(true); Loop::stop(); $this->terminate(); return $packetDeferred; } private function processCallable($workId,$message,&$packetDeferred) { if(($callable = $message->get("callable",false)) !== false) { $this->checking=false; $args = $message->get("args",[]); $asJson = $message->get("asJson",false); $stdOutFromExec = ""; try { ob_start(); $executeResult = $this->execute($workId,$callable,$args,$asJson); $stdOutFromExec = ob_get_contents(); if(!empty($stdOutFromExec)) { dhGlobal::info("Exec Output Buffer:",$stdOutFromExec); } ob_end_clean(); } catch(\Exception $e) { $packetDeferred->reject($this->makeErrorMessage($workId,Message::E_EXECUTE_EXCEPTION,$e->getMessage(),$e->getTrace())); } if($executeResult instanceof Message) { $packetDeferred->resolve($executeResult); } elseif($executeResult instanceof ExceptionMessage) { $packetDeferred->reject($executeResult); } elseif($executeResult instanceof PromiseInterface) { $executeResult->then(function($executeMessage) use ($packetDeferred) { $packetDeferred->resolve($executeMessage); },function($e) use($workId,$packetDeferred) { $packetDeferred->reject($this->makeErrorMessage($workId,Message::E_EXECUTE_EXCEPTION,$e->getMessage(),$e->getTrace())); }); } } else { $packetDeferred->resolve(false); } return $packetDeferred; } private function execute($workId,$callable,$args,$asJson=false) { $this->logExecuteStart($callable,$workId); if(!is_array($args)) { $args = [$args]; } $result = $output = ""; $trace = null; $success = false; if(!is_callable($callable)) { $this->logExecuteError($callable,Message::E_NOT_CALLABLE,"",$workId); $stringCallable = is_array($callable) ? implode("::",$callable) : $callable; return $this->makeErrorMessage($workId,Message::E_NOT_CALLABLE,"$stringCallable is not callable",null); } try { $result = call_user_func($callable,...$args); $success = true; } catch (WorkerException $e) { $this->logExecuteError($callable,"worker_exception",$e->getMessage(),$workId); return $this->makeErrorMessage($workId,$e->getType(),$e->getMessage()); } catch (\Exception $e) { $success=false; $result = $e->getMessage(); $trace = $e->getTrace(); } if($result instanceof \React\Promise\PromiseInterface) { return $this->handleExecDeferredResult($result,$workId,$success,$output,$callable,$asJson,$trace); } else { return $this->handleExecDirectResult($result,$workId,$success,$output,$callable,$asJson,$trace); } } private function handleExecDirectResult($result,$workId,$success,$output,$callable,$asJson,$trace) { $rawResult = null; if($success && $asJson && !is_array($result)) { $rawResult = $result; $result = json_decode($result,true); } if(!$success) { $this->logExecuteError($callable,Message::E_EXECUTE_EXCEPTION,$result,$workId); return $this->makeErrorMessage($workId,Message::E_EXECUTE_EXCEPTION,$result,$trace); } $returnData = ["callable"=>$callable,"result"=>$result,"stdout"=>$output]; if(!is_null($rawResult)) { $returnData["raw"]=$rawResult; } $this->logExecuteSuccess($callable,$workId); return $this->makeSuccessMessage($workId,$returnData); } private function handleExecDeferredResult($result,$workId,$success,$output,$callable,$asJson,$trace) { $execDeferred = new Deferred(); $result->then(function($actualResult) use ($workId,$success,$output,$callable,$asJson,$trace,&$execDeferred) { $rawResult = null; if($success && $asJson && !is_array($actualResult)) { $rawResult = $actualResult; $actualResult = json_decode($actualResult,true); } if(!$success) { $this->logExecuteError($callable,Message::E_EXECUTE_EXCEPTION,$rawResult,$workId); $execDeferred->reject($this->makeErrorMessage($workId,Message::E_EXECUTE_EXCEPTION,$actualResult,$trace)); return false; } else { $returnData = ["callable"=>$callable,"result"=>$actualResult,"stdout"=>$output]; if(!is_null($rawResult)) { $returnData["raw"]=$rawResult; } $this->logExecuteSuccess($callable,$workId); $execDeferred->resolve($this->makeSuccessMessage($workId,$returnData)); } },function($e) use ($workId,$callable,&$execDeferred) { if(is_object($e) && method_exists($e,"getMessage")) { $result = $e->getMessage(); } else { $result = $e; } if(is_object($e) && method_exists($e,"getTrace")) { $trace = $e->getTrace(); } else { $trace = null; } $this->logExecuteError($callable,Message::E_EXECUTE_EXCEPTION,$result,$workId); $execDeferred->resolve($this->makeErrorMessage($workId,Message::E_EXECUTE_EXCEPTION,$result,$trace)); return false; }); return $execDeferred->promise(); } private function logExecuteStart($callable,$msg="",$workId=null) { $stringCallable = is_array($callable) ? implode("::",$callable) : $callable; $this->logInfo("execute",$stringCallable,$workId,"START ",$msg); } private function logExecuteSuccess($callable,$msg="",$workId=null) { $stringCallable = is_array($callable) ? implode("::",$callable) : $callable; $this->logInfo("execute",$stringCallable,$workId,"SUCCESS ".$msg); } private function logExecuteError($callable,$type="",$msg="",$workId=null) { $stringCallable = is_array($callable) ? implode("::",$callable) : $callable; $this->logError("execute",$stringCallable,$workId,strtoupper($type)." - ".$msg); } public function makeSuccessMessage($workId,$data) { return $this->responseMessage($workId,$data); } public function makeErrorMessage($workId,$code,$message="",$trace=null) { return $this->responseMessage($workId,null,["code"=>$code,"message"=>$message,"trace"=>null],true); } public function makeReadyMessage() { return $this->responseMessage("ready",["time"=>microtime(true),"mem"=>memory_get_usage()]); } public function makeAckMessage($workId=null) { return $this->responseMessage("ack",["workId"=>$workId,"time"=>microtime(true),"mem"=>memory_get_usage()]); } public function makePongMessage() { return $this->responseMessage("pong",["time"=>microtime(true),"mem"=>memory_get_usage()]); } public function makeLogMessage($message,$logLevel="info") { if(is_array($message)) { $message = implode(" ",$message); } return $this->responseMessage("log",["message"=>$message,"level"=>$logLevel,"time"=>microtime(true),"mem"=>memory_get_usage()]); } /** * @param string $workId * @param mixed $data * @param mixed $error * @param bool $asException * @return Message|ExceptionMessage */ public function responseMessage($workId,$data=null,$error=null,$asException=false) { $message = new Message(); $message->setWorkId($workId); $message->setData($data); $message->setError($error); $message->time(microtime(true)); $message->mem(memory_get_usage()); if($asException) { return new ExceptionMessage($message,1); } return $message; } /** * * @param Message|ExceptionMessage $message * @param bool $updateRunning * @param bool $skipLog * @return void */ public function sendMessage($message,$updateRunning=true,$skipLog=false) { if($message instanceof ExceptionMessage) { $message = $message->messageObject(); } if(!$skipLog) { $this->logMsgSend($message->toPacket()); } $this->stdout->write($message->toPacket()."\n"); if($updateRunning) { $this->running=false; } } private function terminate($success=true) { $this->terminated=true; if(!$success) { exit(1); } exit(); } private function log($logLevel,...$args) { if($logLevel == "info") { $this->logInfo(...$args); } else if($logLevel == "error") { $this->logError(...$args); } else if($logLevel == "debug") { $this->logDebug(...$args); } else { QueueLogger::log($logLevel,...$args); } } private function logMsgSend(...$args) { QueueLogger::msgSend($this,...$args); } private function logMsgRecv(...$args) { QueueLogger::msgRecv($this,...$args); } private function logInfo(...$args) { QueueLogger::info($this,...$args); } private function logError(...$args) { QueueLogger::error($this,...$args); } private function logDebug(...$args) { QueueLogger::debug($this,...$args); } public function getId() { return $this->id; } public function id() { return $this->id; } public function queueWorkerId() { return $this->queueWorkerId; } public function queueId() { return $this->queueId; } /** * Send a log message to the parent process * @param mixed $message * @param string $logLevel info|error|debug * @return void */ public function sendLogMessage($message,$logLevel="info") { $this->sendMessage($this->makeLogMessage($message,$logLevel),false,true); } public static function bootstrap($bootstrap) { return WorkerUtils::bootstrap($bootstrap); } public static function testError(...$args) { return WorkerUtils::testError(...$args); } public static function exec(...$args) { return WorkerUtils::exec(...$args); } public static function http(...$args) { return WorkerUtils::http(...$args); } } //helper function for initial load function cwSetIfArg($key,$arg,&$var) { if(substr($arg,0,strlen($key)+1) == $key.":") { $var = substr($arg,strlen($key)+1); return $var; } return false; } function cwPacketOrInclude($arg,&$preloadPackets,&$includes) { if(substr($arg,0,4) == "inc:") { $includes[] = substr($arg,4); return true; } if(substr($arg,0,4) == "pkt:") { $preloadPackets[] = substr($arg,4); return true; } return false; }src/worker/WorkerException.php000064400000000532144761607050012517 0ustar00type = $type; parent::__construct($message, $code, $previous); } public function getType() { return $this->type; } }src/worker/WorkerUtils.php000064400000012062144761607050011662 0ustar0048) { throw new \Exception($message,$code); } } public static function exec(...$args) { if(!empty($args) && !is_null($args)) { $command = implode(" ",$args); passthru($command); return ["status"=>true,"command"=>$command]; } throw new \Exception("static::exec requires at least 1 arg"); return ["status"=>false]; } public static function http(...$args) { $reqData = $args[0]; $method = dhGlobal::getVal($reqData,"method",false); $url = dhGlobal::getVal($reqData,"url",false); $getJson = dhGlobal::getVal($reqData,"getJson",false); if($method === false || $url === false) { throw new \Exception("static::http requires 1 arg, that arg should be an array ['method'=>'get|post|..','url'=>'requestUrl','options'=>[]"); return false; } $http = new \boru\dhutils\dhHttp(); $req = $http->request($method,$url); if(($options = dhGlobal::getVal($reqData,"options",false)) !== false) { foreach($options as $type=>$option) { if(method_exists($req,$type)) { if(!isset($option[1])) { $req->$type($option[0]); } else { $req->$type($option[1],$option[2]); } } } } elseif(($requestData = dhGlobal::getval($reqData,"requestData",false)) !== false) { $req->data = $requestData; } $response = $req->send(); if($getJson) { $body = $response->body(true); } else { $body = $response->body(); } return [ "method"=>$method, "url"=>$url, "code"=>$response->code(), "phrase"=>$response->phrase(), "headers"=>$response->header(), "body"=>$body, ]; } public static function bootstrap($bootstrap) { if(!is_null(static::$bootstrap)) { throw new WorkerException(Message::E_BOOSTRAPPED_ALREADY,"already bootstrapped with ".json_encode(static::$bootstrap)); } $bootstrapFile = false; $setupCallable = false; $setupCallableArgs = []; if(!is_array($bootstrap)) { $bootstrapFile = $bootstrap; } elseif(isset($bootstrap["file"]) || isset($bootstrap["callable"])) { $bootstrapFile = isset($bootstrap["file"]) ? $bootstrap["file"] : false; $setupCallable = isset($bootstrap["callable"]) ? $bootstrap["callable"] : false; $setupCallableArgs = isset($bootstrap["args"]) ? $bootstrap["args"] : []; } else { throw new WorkerException(Message::E_BOOTSTRAP_INVALID,"Bootstrap must either be a filename or an array that includes 'file' and/or 'callable'"); } if(!__bootstrap_include($bootstrapFile)) { throw new WorkerException(Message::E_BOOTSTRAP_NOT_FOUND,"File not found: ".$bootstrap); } if(($callableResponse = __bootstrap_call($setupCallable,$setupCallableArgs)) === false) { if(is_array($setupCallable)) { $setupCallable = implode("::",$setupCallable); } throw new WorkerException(Message::E_BOOTSTRAP_NOT_CALLABLE,"Cannot call ".$setupCallable); } static::$bootstrap = $bootstrap; $output = []; if($bootstrapFile !== false) { $output["file"]=$bootstrapFile; } if($callableResponse !== false) { if($setupCallable !== false) { $output["callable"]=$setupCallable; } if($setupCallableArgs !== false) { $output["args"]=$setupCallableArgs; } $output["response"] = $callableResponse; } return ["bootstrap"=>"static::bootstrap"]; } } function __bootstrap_include($file) { if($file === false) { return true; } if(!file_exists($file)) { return false; } include $file; return true; } function __bootstrap_call($callable,$args) { if($callable === false) { return true; } if(!is_callable($callable)) { return false; } try { $return = call_user_func($callable,...$args); } catch (\Exception $e) { $return = false; } return $return; }