From 035c0d5ec0d6eb737313f43aa7f57431cd9888ab Mon Sep 17 00:00:00 2001 From: Katarzyna Marek Date: Tue, 11 Jul 2023 17:50:31 +0200 Subject: [PATCH 1/3] improvement: cancel current request in batched functions --- .../internal/metals/BatchedFunction.scala | 60 ++++++++++++++----- .../metals/BuildServerConnection.scala | 28 ++++----- .../meta/internal/metals/Compilations.scala | 13 +--- .../internal/metals/MetalsLspService.scala | 2 + .../metals/debug/BuildTargetClasses.scala | 10 +++- .../scala/tests/BatchedFunctionSuite.scala | 54 +++++++++++++++-- 6 files changed, 120 insertions(+), 47 deletions(-) diff --git a/metals/src/main/scala/scala/meta/internal/metals/BatchedFunction.scala b/metals/src/main/scala/scala/meta/internal/metals/BatchedFunction.scala index 0c6773ae679..a729ed1932c 100644 --- a/metals/src/main/scala/scala/meta/internal/metals/BatchedFunction.scala +++ b/metals/src/main/scala/scala/meta/internal/metals/BatchedFunction.scala @@ -1,12 +1,15 @@ package scala.meta.internal.metals +import java.util.concurrent.CancellationException import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise +import scala.util.Failure +import scala.util.Success +import scala.util.Try import scala.util.control.NonFatal import scala.meta.internal.async.ConcurrentQueue @@ -22,10 +25,12 @@ final class BatchedFunction[A, B]( fn: Seq[A] => CancelableFuture[B], functionId: String, shouldLogQueue: Boolean = false, + default: Option[B] = None, )(implicit ec: ExecutionContext) extends (Seq[A] => Future[B]) with Function2[Seq[A], () => Unit, Future[B]] with Pauseable { + val rand = new scala.util.Random /** * Call the function with the given arguments. @@ -75,8 +80,17 @@ final class BatchedFunction[A, B]( } def cancelAll(): Unit = { - queue.clear() - unlock() + val requests = ConcurrentQueue.pollAll(queue) + requests.foreach(_.result.complete(defaultResult)) + cancelCurrent() + } + + def cancelCurrent(): Unit = { + lock.get() match { + case None => + case Some(promise) => + promise.tryFailure(new BatchedFunction.BatchedFunctionCancelation) + } } def currentFuture(): Future[B] = { @@ -97,22 +111,28 @@ final class BatchedFunction[A, B]( callback: () => Unit, ) - private val lock = new AtomicBoolean() + private val lock = new AtomicReference[Option[Promise[B]]](None) + private def unlock(): Unit = { - lock.set(false) + lock.set(None) if (!queue.isEmpty) { runAcquire() } } private def runAcquire(): Unit = { - if (!isPaused.get() && lock.compareAndSet(false, true)) { - runRelease() + lazy val promise = { + val p = Promise[B] + p.future.onComplete { _ => unlock() } + p + } + if (!isPaused.get() && lock.compareAndSet(None, Some(promise))) { + runRelease(promise) } else { // Do nothing, the submitted arguments will be handled // by a separate request. } } - private def runRelease(): Unit = { + private def runRelease(p: Promise[B]): Unit = { // Pre-condition: lock is acquired. // Pos-condition: // - lock is released @@ -128,24 +148,29 @@ final class BatchedFunction[A, B]( this.current.set(result) val resultF = for { result <- result.future - _ <- Future { - callbacks.foreach(cb => cb()) - } + _ <- Future { callbacks.foreach(cb => cb()) } } yield result - resultF.onComplete { response => - unlock() - requests.foreach(_.result.complete(response)) + resultF.onComplete(p.tryComplete) + p.future.onComplete { + case Failure(_: BatchedFunction.BatchedFunctionCancelation) => + result.cancel() + requests.foreach(_.result.complete(defaultResult)) + case result => + requests.foreach(_.result.complete(result)) } } else { - unlock() + p.tryFailure(new BatchedFunction.BatchedFunctionCancelation) } } catch { case NonFatal(e) => unlock() - requests.foreach(_.result.failure(e)) + requests.foreach(_.result.tryFailure(e)) scribe.error(s"Unexpected error releasing buffered job", e) } } + + def defaultResult: Try[B] = + default.map(Success(_)).getOrElse(Failure(new CancellationException)) } object BatchedFunction { @@ -153,6 +178,7 @@ object BatchedFunction { fn: Seq[A] => Future[B], functionId: String, shouldLogQueue: Boolean = false, + default: Option[B] = None, )(implicit ec: ExecutionContext ): BatchedFunction[A, B] = @@ -160,5 +186,7 @@ object BatchedFunction { fn.andThen(CancelableFuture(_)), functionId, shouldLogQueue, + default, ) + class BatchedFunctionCancelation extends RuntimeException } diff --git a/metals/src/main/scala/scala/meta/internal/metals/BuildServerConnection.scala b/metals/src/main/scala/scala/meta/internal/metals/BuildServerConnection.scala index bb11df07f28..033946a70d1 100644 --- a/metals/src/main/scala/scala/meta/internal/metals/BuildServerConnection.scala +++ b/metals/src/main/scala/scala/meta/internal/metals/BuildServerConnection.scala @@ -19,6 +19,7 @@ import scala.concurrent.ExecutionContextExecutorService import scala.concurrent.Future import scala.concurrent.Promise import scala.reflect.ClassTag +import scala.util.Success import scala.util.Try import scala.meta.internal.builds.MillBuildTool @@ -73,7 +74,6 @@ class BuildServerConnection private ( private val ongoingRequests = new MutableCancelable().addAll(initialConnection.cancelables) - private val ongoingCompilations = new MutableCancelable() def version: String = _version.get() @@ -154,7 +154,6 @@ class BuildServerConnection private ( def compile(params: CompileParams): CompletableFuture[CompileResult] = { register( server => server.buildTargetCompile(params), - isCompile = true, onFail = Some( ( new CompileResult(StatusCode.CANCELLED), @@ -299,14 +298,9 @@ class BuildServerConnection private ( override def cancel(): Unit = { if (cancelled.compareAndSet(false, true)) { ongoingRequests.cancel() - ongoingCompilations.cancel() } } - def cancelCompilations(): Unit = { - ongoingCompilations.cancel() - } - private def askUser( original: Future[BuildServerConnection.LauncherConnection] ): Future[BuildServerConnection.LauncherConnection] = { @@ -356,9 +350,8 @@ class BuildServerConnection private ( private def register[T: ClassTag]( action: MetalsBuildServer => CompletableFuture[T], onFail: => Option[(T, String)] = None, - isCompile: Boolean = false, ): CompletableFuture[T] = { - + val localCancelable = new MutableCancelable() def runWithCanceling( launcherConnection: BuildServerConnection.LauncherConnection ): Future[T] = { @@ -366,14 +359,14 @@ class BuildServerConnection private ( val cancelable = Cancelable { () => Try(resultFuture.cancel(true)) } - if (isCompile) ongoingCompilations.add(cancelable) - else ongoingRequests.add(cancelable) + ongoingRequests.add(cancelable) + localCancelable.add(cancelable) val result = resultFuture.asScala result.onComplete { _ => - if (isCompile) ongoingCompilations.remove(cancelable) - else ongoingRequests.remove(cancelable) + ongoingRequests.remove(cancelable) + localCancelable.remove(cancelable) } result } @@ -410,7 +403,14 @@ class BuildServerConnection private ( Future.failed(new MetalsBspException(name, t)) }) } - CancelTokens.future(_ => actionFuture) + + CancelTokens.future { token => + token.onCancel().asScala.onComplete { + case Success(java.lang.Boolean.TRUE) => localCancelable.cancel() + case _ => + } + actionFuture + } } def isBuildServerResponsive: Future[Option[Boolean]] = { diff --git a/metals/src/main/scala/scala/meta/internal/metals/Compilations.scala b/metals/src/main/scala/scala/meta/internal/metals/Compilations.scala index 60e0762ec00..07f7ba8cdf4 100644 --- a/metals/src/main/scala/scala/meta/internal/metals/Compilations.scala +++ b/metals/src/main/scala/scala/meta/internal/metals/Compilations.scala @@ -32,12 +32,12 @@ final class Compilations( new BatchedFunction[ b.BuildTargetIdentifier, Map[BuildTargetIdentifier, b.CompileResult], - ](compile, "compileBatch", shouldLogQueue = true) + ](compile, "compileBatch", shouldLogQueue = true, Some(Map.empty)) private val cascadeBatch = new BatchedFunction[ b.BuildTargetIdentifier, Map[BuildTargetIdentifier, b.CompileResult], - ](compile, "cascadeBatch", shouldLogQueue = true) + ](compile, "cascadeBatch", shouldLogQueue = true, Some(Map.empty)) def pauseables: List[Pauseable] = List(compileBatch, cascadeBatch) private val isCompiling = TrieMap.empty[b.BuildTargetIdentifier, Boolean] @@ -115,15 +115,6 @@ final class Compilations( def cancel(): Unit = { cascadeBatch.cancelAll() compileBatch.cancelAll() - buildTargets.all - .flatMap { target => - buildTargets.buildServerOf(target.getId()) - } - .distinct - .foreach { conn => - conn.cancelCompilations() - } - } def recompileAll(): Future[Unit] = { diff --git a/metals/src/main/scala/scala/meta/internal/metals/MetalsLspService.scala b/metals/src/main/scala/scala/meta/internal/metals/MetalsLspService.scala index 4717fccd3d4..2046c1ce99b 100644 --- a/metals/src/main/scala/scala/meta/internal/metals/MetalsLspService.scala +++ b/metals/src/main/scala/scala/meta/internal/metals/MetalsLspService.scala @@ -2216,6 +2216,8 @@ class MetalsLspService( } def disconnectOldBuildServer(): Future[Unit] = { + compilations.cancel() + buildTargetClasses.cancel() diagnostics.reset() bspSession.foreach(connection => scribe.info(s"Disconnecting from ${connection.main.name} session...") diff --git a/metals/src/main/scala/scala/meta/internal/metals/debug/BuildTargetClasses.scala b/metals/src/main/scala/scala/meta/internal/metals/debug/BuildTargetClasses.scala index a116fbafccf..c6bd1d79d91 100644 --- a/metals/src/main/scala/scala/meta/internal/metals/debug/BuildTargetClasses.scala +++ b/metals/src/main/scala/scala/meta/internal/metals/debug/BuildTargetClasses.scala @@ -24,7 +24,11 @@ final class BuildTargetClasses( : TrieMap[b.BuildTargetIdentifier, b.JvmEnvironmentItem] = TrieMap.empty[b.BuildTargetIdentifier, b.JvmEnvironmentItem] val rebuildIndex: BatchedFunction[b.BuildTargetIdentifier, Unit] = - BatchedFunction.fromFuture(fetchClasses, "buildTargetClasses") + BatchedFunction.fromFuture( + fetchClasses, + "buildTargetClasses", + default = Some(()), + ) def classesOf(target: b.BuildTargetIdentifier): Classes = { index.getOrElse(target, new Classes) @@ -175,6 +179,10 @@ final class BuildTargetClasses( val name = NameTransformer.decode(names.last) descriptors.map(descriptor => Symbols.Global(prefix, descriptor(name))) } + + def cancel(): Unit = { + rebuildIndex.cancelAll() + } } sealed abstract class TestFramework(val canResolveChildren: Boolean) diff --git a/tests/unit/src/test/scala/tests/BatchedFunctionSuite.scala b/tests/unit/src/test/scala/tests/BatchedFunctionSuite.scala index 23f8e71fe29..e8917d435d2 100644 --- a/tests/unit/src/test/scala/tests/BatchedFunctionSuite.scala +++ b/tests/unit/src/test/scala/tests/BatchedFunctionSuite.scala @@ -1,10 +1,14 @@ package tests +import java.util.concurrent.Executors + import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.util.Success import scala.meta.internal.metals.BatchedFunction +import scala.meta.internal.metals.Cancelable +import scala.meta.internal.metals.CancelableFuture class BatchedFunctionSuite extends BaseSuite { test("batch") { @@ -95,11 +99,51 @@ class BatchedFunctionSuite extends BaseSuite { mkString.unpause() - assertDiffEqual(paused.value, None) - assertDiffEqual(paused2.value, None) - - val unpaused2 = mkString(List("a", "b")) - assertDiffEqual(unpaused2.value, Some(Success("ab"))) + for { + _ <- paused.failed + _ <- paused2.failed + res <- mkString(List("a", "b")) + _ = assertEquals(res, "ab") + } yield () + } + test("cancel2") { + val executorService = Executors.newFixedThreadPool(10) + val ec2 = ExecutionContext.fromExecutor(executorService) + var i = 1 + val stuckExample: BatchedFunction[String, String] = + new BatchedFunction( + (seq: Seq[String]) => { + seq.toList match { + case "loop" :: Nil => + val future = Future.apply { + while (i == 1) { + Thread.sleep(1) + } + "loop-result" + }(ec2) + CancelableFuture[String](future, Cancelable { () => i = 2 }) + case _ => + CancelableFuture[String]( + Future.successful("result"), + Cancelable.empty, + ) + } + }, + "stuck example", + default = Some("default"), + )(ec2) + val cancelled = stuckExample("loop") + assertEquals(i, 1) + assert(cancelled.value.isEmpty) + val normal = stuckExample("normal") + stuckExample.cancelCurrent() + for { + str <- cancelled + _ = assertEquals(i, 2) + _ = assertEquals(str, "default") + str <- normal + _ = assertEquals(str, "result") + } yield () } } From b1e86f0346aa43992cec1bbe670f18d56ce04a79 Mon Sep 17 00:00:00 2001 From: Katarzyna Marek Date: Wed, 26 Jul 2023 12:45:50 +0200 Subject: [PATCH 2/3] cancellation test --- .../internal/metals/BatchedFunction.scala | 1 - .../src/test/scala/tests/BillLspSuite.scala | 39 +++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/metals/src/main/scala/scala/meta/internal/metals/BatchedFunction.scala b/metals/src/main/scala/scala/meta/internal/metals/BatchedFunction.scala index a729ed1932c..3070cccbf25 100644 --- a/metals/src/main/scala/scala/meta/internal/metals/BatchedFunction.scala +++ b/metals/src/main/scala/scala/meta/internal/metals/BatchedFunction.scala @@ -30,7 +30,6 @@ final class BatchedFunction[A, B]( extends (Seq[A] => Future[B]) with Function2[Seq[A], () => Unit, Future[B]] with Pauseable { - val rand = new scala.util.Random /** * Call the function with the given arguments. diff --git a/tests/unit/src/test/scala/tests/BillLspSuite.scala b/tests/unit/src/test/scala/tests/BillLspSuite.scala index 5a50c4ad3ec..50371140af4 100644 --- a/tests/unit/src/test/scala/tests/BillLspSuite.scala +++ b/tests/unit/src/test/scala/tests/BillLspSuite.scala @@ -10,6 +10,7 @@ import scala.meta.internal.metals.ServerCommands import scala.meta.io.AbsolutePath import bill._ +import ch.epfl.scala.bsp4j.StatusCode class BillLspSuite extends BaseLspSuite("bill") { @@ -222,4 +223,42 @@ class BillLspSuite extends BaseLspSuite("bill") { Bill.installGlobal(globalBsp.toNIO, "Bob") testSelectServerDialogue() } + + test("cancel-compile") { + val cancelPattern = + """Sending notification '\$\/cancelRequest'\s*Params: \{\s*\"id\": \"([0-9]+)\"\s*\}""".r + cleanWorkspace() + Bill.installWorkspace(workspace.toNIO, "Bill") + def trace = workspace.resolve(".metals/bsp.trace.json").readText + for { + _ <- initialize( + """|/src/com/App.scala + |object App { + | val x: Int = 1 + |} + |/.metals/bsp.trace.json + |""".stripMargin + ) + (compileReport, _) <- server.server.compilations + .compileFile( + workspace.resolve("src/com/App.scala") + ) + .zip { + // wait until the compilation start + while (!trace.contains(s"buildTarget/compile")) { + Thread.sleep(1) + } + server.executeCommand(ServerCommands.CancelCompile) + } + _ = assertEquals(compileReport.getStatusCode(), StatusCode.CANCELLED) + currentTrace = trace + cancelId = cancelPattern.findFirstMatchIn(currentTrace).get.group(1) + _ = assert(currentTrace.contains(s"buildTarget/compile - ($cancelId)")) + compileReport <- server.server.compilations + .compileFile( + workspace.resolve("src/com/App.scala") + ) + _ = assertEquals(compileReport.getStatusCode(), StatusCode.OK) + } yield () + } } From d77b46a21beff9c65878b813e0590913341a8a55 Mon Sep 17 00:00:00 2001 From: Katarzyna Marek Date: Fri, 29 Sep 2023 08:42:54 +0200 Subject: [PATCH 3/3] debug test --- tests/unit/src/test/scala/tests/BillLspSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/unit/src/test/scala/tests/BillLspSuite.scala b/tests/unit/src/test/scala/tests/BillLspSuite.scala index 50371140af4..f3743ddbea1 100644 --- a/tests/unit/src/test/scala/tests/BillLspSuite.scala +++ b/tests/unit/src/test/scala/tests/BillLspSuite.scala @@ -252,7 +252,9 @@ class BillLspSuite extends BaseLspSuite("bill") { } _ = assertEquals(compileReport.getStatusCode(), StatusCode.CANCELLED) currentTrace = trace - cancelId = cancelPattern.findFirstMatchIn(currentTrace).get.group(1) + cancelMatch = cancelPattern.findFirstMatchIn(currentTrace) + _ = assert(cancelMatch.nonEmpty, trace) + cancelId = cancelMatch.get.group(1) _ = assert(currentTrace.contains(s"buildTarget/compile - ($cancelId)")) compileReport <- server.server.compilations .compileFile(