From 69b57c345c97c3760bed30211ec9d50be4d6e858 Mon Sep 17 00:00:00 2001 From: Tim Haasdyk Date: Tue, 28 May 2024 23:25:35 +0200 Subject: [PATCH] Bug/fix long pushes end up in invalid state (#12) * Fix unit tests * Add failing unit test for hg incoming timeout * Introduce verify state and prevent invalid transaction state. * fixup! Introduce verify state and prevent invalid transaction state. * Make fake-polling use tiny chunk-size * Rename synchronize and prevent late unexpected failures in tests * Fix indentation and unused variable. --- api/src/AsyncRunner.php | 14 ++--- api/src/BundleHelper.php | 1 + api/src/HgResumeApi.php | 110 ++++++++++++++++++---------------- api/src/HgResumeResponse.php | 23 +++++++ api/src/HgRunner.php | 70 +++++++++++++++------- api/src/RestServer.php | 6 ++ api/test/AsyncRunner_Test.php | 5 +- api/test/HgResumeApi_Test.php | 44 ++++++++++++-- api/test/HgRunner_Test.php | 6 +- 9 files changed, 191 insertions(+), 88 deletions(-) diff --git a/api/src/AsyncRunner.php b/api/src/AsyncRunner.php index 3a3bf34..ad67bd9 100644 --- a/api/src/AsyncRunner.php +++ b/api/src/AsyncRunner.php @@ -64,17 +64,17 @@ public function cleanUp() { } /** - * - * @throws AsyncRunnerException + * Waits for up to 5s for the runner to complete + * @return boolean True if complete, otherwise not complete yet */ - public function synchronize() { - for ($i = 0; $i < 200; $i++) { + public function waitForIsComplete() { + for ($i = 0; $i < 5; $i++) { if ($this->isComplete()) { - return; + return true; } - usleep(500000); + usleep(1000000); } - throw new AsyncRunnerException("Error: Long running process exceeded 100 seconds while waiting to synchronize"); + return false; } } diff --git a/api/src/BundleHelper.php b/api/src/BundleHelper.php index 4fb67d5..4aa585a 100644 --- a/api/src/BundleHelper.php +++ b/api/src/BundleHelper.php @@ -6,6 +6,7 @@ class BundleHelper { const State_Bundle = 'Bundle'; const State_Downloading = 'Downloading'; const State_Uploading = 'Uploading'; + const State_Validating = 'Validating'; const State_Unbundle = 'Unbundle'; private $_transactionId; diff --git a/api/src/HgResumeApi.php b/api/src/HgResumeApi.php index 373955f..746211e 100644 --- a/api/src/HgResumeApi.php +++ b/api/src/HgResumeApi.php @@ -91,66 +91,72 @@ function pushBundleChunk($repoId, $bundleSize, $offset, $data, $transId) { $bundle->setOffset($newSow); // for the final chunk; assemble the bundle and apply the bundle - if ($newSow == $bundleSize) { - $bundle->setState(BundleHelper::State_Unbundle); - try { // REVIEW Would be nice if the try / catch logic was universal. ie one policy for the api function. CP 2012-06 - $bundleFilePath = $bundle->getBundleFileName(); - $asyncRunner = $hg->unbundle($bundleFilePath); - for ($i = 0; $i < 4; $i++) { - if ($asyncRunner->isComplete()) { - if (BundleHelper::bundleOutputHasErrors($asyncRunner->getOutput())) { - $responseValues = array('transId' => $transId); - return new HgResumeResponse(HgResumeResponse::RESET, $responseValues); - } - $bundle->cleanUp(); - $asyncRunner->cleanUp(); - $responseValues = array('transId' => $transId); - return new HgResumeResponse(HgResumeResponse::SUCCESS, $responseValues); - } - sleep(1); - } - $responseValues = array('transId' => $transId, 'sow' => $newSow); - // If the unbundle operation has not completed within 4 seconds (unlikely for a large repo) then we punt back to the client with a RECEIVED. - //The client can then initiate another request, and hopefully the unbundle will have finished by then - // FUTURE: we should return an INPROGRESS status and have the client display a message accordingly. - cjh 2014-07 - return new HgResumeResponse(HgResumeResponse::RECEIVED, $responseValues); - } catch (UnrelatedRepoException $e) { - $bundle->setOffset(0); - $responseValues = array('Error' => substr($e->getMessage(), 0, 1000)); - $responseValues['transId'] = $transId; - return new HgResumeResponse(HgResumeResponse::FAIL, $responseValues); - } catch (Exception $e) { - // REVIEW The RESET response may not make sense in this context anymore. Why would we want to tell the client to resend a bundle if it failed the first time? My guess is never. cjh 2013-03 - //echo $e->getMessage(); // FIXME - $bundle->setOffset(0); - $responseValues = array('Error' => substr($e->getMessage(), 0, 1000)); - $responseValues['transId'] = $transId; - return new HgResumeResponse(HgResumeResponse::RESET, $responseValues); - } - } else { + if ($newSow != $bundleSize) { // received the chunk, but it's not the last one; we expect more chunks $responseValues = array('transId' => $transId, 'sow' => $newSow); return new HgResumeResponse(HgResumeResponse::RECEIVED, $responseValues); } - break; + // We got everything, fall through to completePushBundle + case BundleHelper::State_Validating: case BundleHelper::State_Unbundle: - $bundleFilePath = $bundle->getBundleFileName(); - $asyncRunner = new AsyncRunner($bundleFilePath); - if ($asyncRunner->isComplete()) { - if (BundleHelper::bundleOutputHasErrors($asyncRunner->getOutput())) { + return $this->completePushBundle($bundle, $hg, $transId, $bundleSize); + } + } + + /** + * + * @param BundleHelper $bundle + * @param HgRunner $hg + * @param string $transId + * @throws UnrelatedRepoException + * @throws Exception + * @return HgResumeResponse + */ + function completePushBundle($bundle, $hg, $transId, $bundleSize) { + try { // REVIEW Would be nice if the try / catch logic was universal. ie one policy for the api function. CP 2012-06 + $bundleFilePath = $bundle->getBundleFileName(); + + switch ($bundle->getState()) { + case BundleHelper::State_Uploading: + $bundle->setState(BundleHelper::State_Validating); + $hg->startValidating($bundleFilePath); + // fall through to State_Validating + case BundleHelper::State_Validating: + if ($hg->finishValidating($bundleFilePath)) { + $bundle->setState(BundleHelper::State_Unbundle); + $hg->unbundle($bundleFilePath); + } else { + return HgResumeResponse::PendingResponse($transId, 'Verification in progress...', $bundleSize); + } + // fall through to State_Unbundle + case BundleHelper::State_Unbundle: + $asyncRunner = new AsyncRunner($bundleFilePath); + if ($asyncRunner->waitForIsComplete()) { + if (BundleHelper::bundleOutputHasErrors($asyncRunner->getOutput())) { + $responseValues = array('transId' => $transId); + // REVIEW The RESET response may not make sense in this context anymore. Why would we want to tell the client to resend a bundle if it failed the first time? My guess is never. cjh 2013-03 + return new HgResumeResponse(HgResumeResponse::RESET, $responseValues); + } + $bundle->cleanUp(); + $asyncRunner->cleanUp(); $responseValues = array('transId' => $transId); - // REVIEW The RESET response may not make sense in this context anymore. Why would we want to tell the client to resend a bundle if it failed the first time? My guess is never. cjh 2013-03 - return new HgResumeResponse(HgResumeResponse::RESET, $responseValues); + return new HgResumeResponse(HgResumeResponse::SUCCESS, $responseValues); + } else { + return HgResumeResponse::PendingResponse($transId, 'Unpacking in progress...', $bundleSize); } - $bundle->cleanUp(); - $asyncRunner->cleanUp(); - $responseValues = array('transId' => $transId); - return new HgResumeResponse(HgResumeResponse::SUCCESS, $responseValues); - } else { - $responseValues = array('transId' => $transId, 'sow' => $bundle->getOffset()); - return new HgResumeResponse(HgResumeResponse::RECEIVED, $responseValues); } - break; + } catch (UnrelatedRepoException $e) { + $bundle->setOffset(0); + $responseValues = array('Error' => substr($e->getMessage(), 0, 1000)); + $responseValues['transId'] = $transId; + return new HgResumeResponse(HgResumeResponse::FAIL, $responseValues); + } catch (Exception $e) { + // REVIEW The RESET response may not make sense in this context anymore. Why would we want to tell the client to resend a bundle if it failed the first time? My guess is never. cjh 2013-03 + //echo $e->getMessage(); // FIXME + $bundle->setOffset(0); + $responseValues = array('Error' => substr($e->getMessage(), 0, 1000)); + $responseValues['transId'] = $transId; + return new HgResumeResponse(HgResumeResponse::RESET, $responseValues); } } diff --git a/api/src/HgResumeResponse.php b/api/src/HgResumeResponse.php index e7b884f..8f312cc 100644 --- a/api/src/HgResumeResponse.php +++ b/api/src/HgResumeResponse.php @@ -61,6 +61,13 @@ class HgResumeResponse { // HTTP 202 Accepted const INPROGRESS = 9; + /* TIMEOUT + * The server is in the middle of an operation that was not completed during the request. + * The client should poll the server to ensure any additional operations are started. + * Used only by pushBundleChunk */ + // HTTP 408 RequestTimeout + const TIMEOUT = 9; + public $Code; public $Values; public $Content; @@ -72,6 +79,22 @@ function __construct($code, $values = array(), $content = "", $version = API_VER $this->Content = $content; $this->Version = $version; } + + /** + * There's no explicit polling mechanism in the v3 client. + * But we can make it retry by sending any status code it's not familiar with. + * We can also determine the chunk size the client will use, so we make it super tiny to minimize unnecessary data transfer. + * @param string $transId + * @param string $note + * @param int $bundleSize + * @return HgResumeResponse + */ + public static function PendingResponse($transId, $note, $bundleSize) { + return new HgResumeResponse(HgResumeResponse::TIMEOUT, array( + 'transId' => $transId, 'Note' => $note, + // Make the client send only 10 bytes + 'sow' => $bundleSize - 10)); + } } ?> \ No newline at end of file diff --git a/api/src/HgRunner.php b/api/src/HgRunner.php index 4b186d7..b57f686 100644 --- a/api/src/HgRunner.php +++ b/api/src/HgRunner.php @@ -42,14 +42,10 @@ private function logEvent($message) { } /** - * * @param string $filepath * @throws HgException - * @throws UnrelatedRepoException - * @throws Exception - * @return AsyncRunner */ - function unbundle($filepath) { + function startValidating($filepath) { if (!is_file($filepath)) { throw new HgException("bundle file '$filepath' is not a file!"); } @@ -58,19 +54,53 @@ function unbundle($filepath) { // run hg incoming to make sure this bundle is related to the repo $cmd = "hg incoming $filepath"; $this->logEvent("cmd: $cmd"); - $asyncRunner = new AsyncRunner($filepath . ".incoming"); + $asyncRunner = $this->getValidationRunner($filepath); $asyncRunner->run($cmd); - $asyncRunner->synchronize(); - $output = $asyncRunner->getOutput(); - if (preg_match('/abort:.*unknown parent/', $output)) { - throw new UnrelatedRepoException("Project is unrelated! (unrelated bundle pushed to repo)"); - } - if (preg_match('/parent:\s*-1:/', $output)) { - throw new UnrelatedRepoException("Project is unrelated! (unrelated bundle pushed to repo)"); + } + + /** + * @param string $filepath + * @throws UnrelatedRepoException + * @throws Exception + * @return boolean True if finished, otherwise not finished yet + */ + function finishValidating($filepath) { + $asyncRunner = $this->getValidationRunner($filepath); + if ($asyncRunner->waitForIsComplete()) { + $output = $asyncRunner->getOutput(); + if (preg_match('/abort:.*unknown parent/', $output)) { + throw new UnrelatedRepoException("Project is unrelated! (unrelated bundle pushed to repo)"); + } + if (preg_match('/parent:\s*-1:/', $output)) { + throw new UnrelatedRepoException("Project is unrelated! (unrelated bundle pushed to repo)"); + } + if (preg_match('/abort:.*not a Mercurial bundle/', $output)) { + throw new Exception("Project cannot be updated! (corrupt bundle pushed to repo)"); + } + return true; } - if (preg_match('/abort:.*not a Mercurial bundle/', $output)) { - throw new Exception("Project cannot be updated! (corrupt bundle pushed to repo)"); + return false; + } + + /** + * @param string $filepath + * @return AsyncRunner + */ + function getValidationRunner($filepath) { + return new AsyncRunner($filepath . ".incoming"); + } + + /** + * + * @param string $filepath + * @throws HgException + * @return AsyncRunner + */ + function unbundle($filepath) { + if (!is_file($filepath)) { + throw new HgException("bundle file '$filepath' is not a file!"); } + chdir($this->repoPath); // NOTE: I tried with -R and it didn't work for me. CP 2012-06 $cmd = "hg unbundle $filepath"; $this->logEvent("cmd: $cmd"); @@ -78,10 +108,6 @@ function unbundle($filepath) { $asyncRunner->run($cmd); return $asyncRunner; } - - function assertIsRelatedRepo($bundleFilePath) { - - } /** * @@ -121,13 +147,15 @@ function makeBundle($baseHashes, $bundleFilePath) { } /** - * helper function, mostly for tests + * Helper function for tests * @param string $baseHashes[] * @param string $bundleFilePath */ function makeBundleAndWaitUntilFinished($baseHashes, $bundleFilePath) { $asyncRunner = $this->makeBundle($baseHashes, $bundleFilePath); - $asyncRunner->synchronize(); + if (!$asyncRunner->waitForIsComplete()) { + throw new Exception("Error: make bundle failed to complete"); + } return $asyncRunner; } diff --git a/api/src/RestServer.php b/api/src/RestServer.php index 9699099..7186d32 100644 --- a/api/src/RestServer.php +++ b/api/src/RestServer.php @@ -147,6 +147,12 @@ static function mapHgResponse($hgrCode) { $httpcode = "202 Accepted"; $codeString = 'INPROGRESS'; break; + case HgResumeResponse::TIMEOUT: + // 408 is not currently handled by the client, so it will be + // interpreted as a fail and result in a retry. That is the intentional behaviour for now (v03). + $httpcode = "408 RequestTimeout"; + $codeString = 'INPROGRESS'; + break; default: throw new Exception("Unknown response code {$response->Code}"); break; diff --git a/api/test/AsyncRunner_Test.php b/api/test/AsyncRunner_Test.php index 9dae6de..30d8516 100644 --- a/api/test/AsyncRunner_Test.php +++ b/api/test/AsyncRunner_Test.php @@ -10,7 +10,7 @@ function testRunIsComplete_FalseThenTrue() { $runner = new AsyncRunner('/tmp/testFile'); $runner->run('echo foo'); $this->assertFalse($runner->isComplete()); - $runner->synchronize(); + $runner->waitForIsComplete(); $this->assertTrue($runner->isComplete()); } @@ -47,7 +47,8 @@ function testGetOutput_NotComplete_Throws() { function testGetOutput_Complete_ReturnsOutput() { $runner = new AsyncRunner('/tmp/testFile'); $runner->run('echo abort'); - $runner->synchronize(); + $runner->waitForIsComplete(); + $this->assertTrue($runner->isComplete()); $data = $runner->getOutput(); $this->assertPattern('/abort/', $data); } diff --git a/api/test/HgResumeApi_Test.php b/api/test/HgResumeApi_Test.php index f62a20d..b6fd769 100644 --- a/api/test/HgResumeApi_Test.php +++ b/api/test/HgResumeApi_Test.php @@ -279,7 +279,7 @@ function testPullBundleChunk_PullUntilFinished_AssembledBundleIsValid() { $assembledBundle = ''; $bundleSize = 1; // initialize the bundleSize; it will be overwritten after the first API call - while (mb_strlen($assembledBundle) < $bundleSize) { + while (mb_strlen($assembledBundle, "8bit") < $bundleSize) { $response = $this->api->pullBundleChunkInternal('sampleHgRepo2', array($hash), $offset, $chunkSize, $transId, true); $this->assertEqual(HgResumeResponse::SUCCESS, $response->Code); $bundleSize = $response->Values['bundleSize']; @@ -302,7 +302,7 @@ function testPullBundleChunk_PullFromBaseRevisionUntilFinishedOnTwoBranchRepo_As $assembledBundle = ''; $bundleSize = 1; // initialize the bundleSize; it will be overwritten after the first API call - while (mb_strlen($assembledBundle) < $bundleSize) { + while (mb_strlen($assembledBundle, "8bit") < $bundleSize) { $response = $this->api->pullBundleChunkInternal('sample2branchHgRepo', array($hash), $offset, $chunkSize, $transId, true); $this->assertEqual(HgResumeResponse::SUCCESS, $response->Code); $bundleSize = $response->Values['bundleSize']; @@ -326,7 +326,7 @@ function testPullBundleChunk_PullFromTwoBaseRevisionsUntilFinishedOnTwoBranchRep $assembledBundle = ''; $bundleSize = 1; // initialize the bundleSize; it will be overwritten after the first API call - while (mb_strlen($assembledBundle) < $bundleSize) { + while (mb_strlen($assembledBundle, "8bit") < $bundleSize) { $response = $this->api->pullBundleChunkInternal('sample2branchHgRepo', $hashes, $offset, $chunkSize, $transId, true); $this->assertEqual(HgResumeResponse::SUCCESS, $response->Code); $bundleSize = $response->Values['bundleSize']; @@ -426,7 +426,7 @@ function testPullBundleChunk_BaseHashIsZero_ReturnsEntireRepoAsBundle() { $assembledBundle = ''; $bundleSize = 1; // initialize the bundleSize; it will be overwritten after the first API call - while (mb_strlen($assembledBundle) < $bundleSize) { + while (mb_strlen($assembledBundle, "8bit") < $bundleSize) { $response = $this->api->pullBundleChunkInternal('sampleHgRepo2', array($hash), $offset, $chunkSize, $transId, true); $this->assertEqual(HgResumeResponse::SUCCESS, $response->Code); $bundleSize = $response->Values['bundleSize']; @@ -493,6 +493,42 @@ function testPushBundleChunk_pushBundleFromUnrelatedRepo2_FailCodeWithMessage() $response = $this->api->pushBundleChunk('sampleHgRepo', $bundleSize, 0, $bundleData, $transId); $this->assertEqual(HgResumeResponse::FAIL, $response->Code); } + + function testPushBundleChunk_lastChunkTimesoutDuringHgIncoming_IsPickedUpByFollowUpRequest() { + $this->testEnvironment->makeRepo(TestPath . "/data/sampleHgRepo.zip"); + $transId = __FUNCTION__; + $this->api->finishPushBundle($transId); + + $bundleData = file_get_contents(TestPath . "/data/sample.bundle"); + + // Simulate a request timeout during hg unbundle + $this->pushFullBundle_KillDuringHgUnbundle('sampleHgRepo', $bundleData, $transId); + + // Ensure that a follow up pushBundle successfully completes the unbundle + $bundleSize = mb_strlen($bundleData, "8bit"); + $response = $this->api->pushBundleChunk('sampleHgRepo', $bundleSize, 0, $bundleData, $transId); + $this->assertEqual(HgResumeResponse::SUCCESS, $response->Code); + } + + function pushFullBundle_KillDuringHgUnbundle($repoId, $bundleData, $transId) { + // fork a child process so we can kill unfinished work + $pid = pcntl_fork(); + + if ($pid == -1) { + throw new Exception('Failed to fork'); + } elseif ($pid) { // PARENT + // Wait until hg incoming has started running + // it starts almost instantly and takes at least 500ms to run, because 500ms is the length of AsyncRunner's synchronize interval + usleep(250 * 1000); + // Kill so that nothing happens when hg incoming is done + posix_kill($pid, SIGTERM); + } else { // CHILD + $bundleSize = mb_strlen($bundleData, "8bit"); + // push the entire bundle so that hg incoming is triggered + $this->api->pushBundleChunk($repoId, $bundleSize, 0, $bundleData, $transId); + throw new Exception("The child process was protected by its buggy shield of digital faithlessness."); + } + } } ?> diff --git a/api/test/HgRunner_Test.php b/api/test/HgRunner_Test.php index f41b98f..82a97f0 100644 --- a/api/test/HgRunner_Test.php +++ b/api/test/HgRunner_Test.php @@ -36,10 +36,12 @@ function testUnbundle_BundleFileExists_BundleIsApplied() { // check for success file $hg = new HgRunner($repoPath); $asyncRunner = $hg->unbundle($bundleFile); - $asyncRunner->synchronize(); + $asyncRunner->waitForIsComplete(); + $this->assertTrue($asyncRunner->isComplete()); $asyncRunner->cleanUp(); $asyncRunner = $hg->update(); - $asyncRunner->synchronize(); + $asyncRunner->waitForIsComplete(); + $this->assertTrue($asyncRunner->isComplete()); $this->assertTrue(file_exists($successFile)); }