diff --git a/examples/Databases.php b/examples/Databases.php index c79e297..8cc6188 100644 --- a/examples/Databases.php +++ b/examples/Databases.php @@ -37,6 +37,8 @@ public function __construct(array $config){ public function run() : void{ //set up the connection for tasks to use self::$connection = new PDO(...unserialize($this->config)); + self::$connection->exec("CREATE TABLE IF NOT EXISTS test (id INTEGER PRIMARY KEY AUTOINCREMENT, value TEXT)"); + self::$connection->exec("INSERT INTO test (value) VALUES ('Hello world!')"); } /** @@ -45,6 +47,24 @@ public function run() : void{ public static function getConnection() : \PDO{ return self::$connection; } } +class DBFetchTask extends Runnable{ + private string $result; + + public function run() : void{ + $pdo = PDOWorker::getConnection(); + $statement = $pdo->prepare("SELECT * FROM test"); + $statement->execute(); + + //we're serializing here because fetchAll returns an array, which is not thread-safe + //you might not need to do this if your results are simple enough to be thread-safe + $this->result = serialize($statement->fetchAll()); + } + + public function getResult() : array{ + return unserialize($this->result); + } +} + /* * When the Pool starts new Worker threads, they will construct the * PDO object before any Runnable tasks are executed. @@ -56,11 +76,23 @@ public static function getConnection() : \PDO{ return self::$connection; } */ for($i = 0; $i < 10; $i++){ - $pool->submit(new class extends Runnable{ - public function run() : void{ - var_dump(\PDOWorker::getConnection()); - } - }); + $pool->submit(new DBFetchTask()); +} + +/* + * Make sure to regularly call collect() on the pool, otherwise memory will be leaked. + * You can use collect() to fetch the results of tasks. + * If you don't care about the results, call collect() without a callback. + */ +while($pool->collect(function(Runnable $runnable) : bool{ + if($runnable instanceof DBFetchTask){ + var_dump($runnable->getResult()); + } + return true; //returning false will stop the collection +}) > 0){ + echo "Still waiting\n"; + usleep(1_000_000); + //you probably don't want to do this in a loop in real code, as it will block until all tasks are finished } $pool->shutdown(); diff --git a/examples/MySQLi.php b/examples/MySQLi.php deleted file mode 100644 index faff543..0000000 --- a/examples/MySQLi.php +++ /dev/null @@ -1,88 +0,0 @@ -hostname = $hostname; - $this->username = $username; - $this->password = $password; - $this->database = $database; - $this->port = $port; - } - - public function getConnection() { - if (!self::$link) { - self::$link = new mysqli( - $this->hostname, - $this->username, - $this->password, - $this->database, - $this->port); - } - - /* do some exception/error stuff here maybe */ - - return self::$link; - } - - protected $hostname; - protected $username; - protected $password; - protected $database; - protected $port; - - /** - * Note that the link is stored statically, which for pmmpthread, means thread local - **/ - protected static $link; -} - -class Query extends Runnable { - - public function __construct(string $sql, ThreadSafeArray $store) { - $this->sql = $sql; - $this->result = $store; - } - - public function run() : void { - $mysqli = $this->worker->getConnection(); - - $result = $mysqli->query($this->sql); - - if ($result) { - while (($row = $result->fetch_assoc())) { - $this->result[] = "{$row['Id']}) {$row['User']} ({$row['State']})"; - } - } - } - - protected $sql; - protected $result; -} - -$pool = new Pool(4, "Connect", ["localhost", "root", "", "mysql"]); -$stores = []; - -for ($i = 0; $i < 6; ++$i) { - $store = new ThreadSafeArray(); // store all results in here for the Query object - $pool->submit(new Query("SHOW PROCESSLIST;", $store)); - $stores[] = $store; // maintain a list of stores to retrieve their results later -} - -$pool->collect(); // collect all finished work to free up memory -$pool->shutdown(); // shutdown the pool to make sure it has completely finished executing - -print_r($stores); // output all results for all Query objects - -?> diff --git a/examples/Pooling.php b/examples/Pooling.php deleted file mode 100644 index 14f4e56..0000000 --- a/examples/Pooling.php +++ /dev/null @@ -1,121 +0,0 @@ -logger = $logger; - $this->config = serialize($config); - } - - /* - * The only thing to do here is setup the PDO object - */ - public function run() : void { - if (isset($this->config)) { - self::$connection = new PDO(... unserialize($this->config)); - } - } - - public function getLogger() { return $this->logger; } - public function getConnection() { return self::$connection; } - - private $logger; - private $config; - private static $connection; -} - -class WebWork extends Runnable { - /* - * An example of some work that depends upon a shared logger - * and a thread-local PDO connection - */ - public function run() : void { - $logger = $this->worker->getLogger(); - $logger->log("%s executing in Thread #%lu", - __CLASS__, $this->worker->getThreadId()); - - if ($this->worker->getConnection()) { - $logger->log("%s has PDO in Thread #%lu", - __CLASS__, $this->worker->getThreadId()); - } - } -} - -class SafeLog extends ThreadSafe { - - /* - * If logging were allowed to occur without synchronizing - * the output would be illegible - */ - public function log($message, ... $args) { - $this->synchronized(function($message, ... $args){ - if (is_callable($message)) { - $message(...$args); - } else echo vsprintf("{$message}\n", ... $args); - }, $message, $args); - } -} - -$logger = new SafeLog(); - -/* -* Constructing the Pool does not create any Threads -*/ -$pool = new Pool(8, 'WebWorker', [$logger, ["sqlite:example.db"]]); - -/* -* Only when there is work to do are threads created -*/ -$pool->submit(new WebWork()); -$pool->submit(new WebWork()); -$pool->submit(new WebWork()); -$pool->submit(new WebWork()); -$pool->submit(new WebWork()); -$pool->submit(new WebWork()); -$pool->submit(new WebWork()); -$pool->submit(new WebWork()); -$pool->submit(new WebWork()); -$pool->submit(new WebWork()); -$pool->submit(new WebWork()); -$pool->submit(new WebWork()); -$pool->submit(new WebWork()); -$pool->submit(new WebWork()); - -/* -* The Workers in the Pool retain references to the WebWork objects submitted -* in order to release that memory the Pool::collect method must be invoked in the same -* context that created the Pool. -* -* The Worker::collect method is invoked for every Worker in the Pool, the garbage list -* for each Worker is traversed and each piece of work (a WebWork object, in this case) -* is passed to the optionally provided Closure. -* -* Collecting in a continuous loop will cause the garbage list to be emptied. -*/ -while ($pool->collect()); - -/* -* We could submit more stuff here, the Pool is still waiting for work to be submitted. -*/ -$logger->log(function($pool) { - var_dump($pool); -}, $pool); - -/* -* Shutdown Pools at the appropriate time, don't leave it to chance ! -*/ -$pool->shutdown(); -?>