diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..90da557 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,53 @@ +name: CI +on: + push: + branches: + - main + tags: + - "v*" + pull_request: + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + - uses: coursier/cache-action@v6.3 + - uses: VirtusLab/scala-cli-setup@v0.1 + - name: Test + run: scala-cli test src --cross + + # publish: + # needs: test + # if: github.event_name == 'push' + # runs-on: ubuntu-latest + # steps: + # - uses: actions/checkout@v3 + # with: + # fetch-depth: 0 + # - uses: coursier/cache-action@v6.3 + # - uses: VirtusLab/scala-cli-setup@v0.1 + # - name: Publish + # run: scala-cli publish src --cross + # env: + # PUBLISH_USER: ${{ secrets.PUBLISH_USER }} + # PUBLISH_PASSWORD: ${{ secrets.PUBLISH_PASSWORD }} + # PUBLISH_SECRET_KEY: ${{ secrets.PUBLISH_SECRET_KEY }} + # PUBLISH_SECRET_KEY_PASSWORD: ${{ secrets.PUBLISH_SECRET_KEY_PASSWORD }} + + package: + needs: test + if: github.event_name == 'push' + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + - uses: coursier/cache-action@v6.3 + - uses: VirtusLab/scala-cli-setup@v0.1 + - name: Create and upload archives + run: scala-cli run Upload.scala + env: + UPLOAD_GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b3c6c86 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.bsp/ +.scala-build/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..7f448ac --- /dev/null +++ b/README.md @@ -0,0 +1,26 @@ +# lightweight-spark-distrib + +*lightweight-spark-distrib* is a small application allowing to make Spark distributions more +lightweight. From an existing Spark distribution, *lightweight-spark-distrib* looks for the +JARs it contains and tries to find those on Maven Central. It then all files but the JARs +it found on Maven Central to a new directory, and writes alongside them a script that +relies on [coursier](https://github.com/coursier/coursier) to fetch the missing JARs. + +The resulting Spark distributions are much more lightweight (~25 MB uncompressed / ~16 MB compressed) +than their original counterpart (which usually weight more than 200 MB). As a consequence, the former +are easier to distribute, and more easily benefit from mechanisms such as CI caches. + + +## Generate a lightweight archive + +```text +$ scala-cli run \ + --workspace . \ + src \ + -- \ + --dest spark-3.0.3-bin-hadoop2.7-lightweight.tgz \ + https://archive.apache.org/dist/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz \ + --spark 3.0.3 \ + --scala 2.12.10 \ + --archive +``` diff --git a/Upload.scala b/Upload.scala new file mode 100644 index 0000000..3b1cc9b --- /dev/null +++ b/Upload.scala @@ -0,0 +1,41 @@ +//> using scala "2.13" +//> using lib "io.github.alexarchambault.mill::mill-native-image-upload:0.1.21" +//> using lib "com.lihaoyi::os-lib:0.8.1" + +object Upload { + private def create(scalaVersion: String, sparkVersion: String, sourceUrl: String, dest: os.Path): Unit = + os.proc("scala-cli", "run", "src", "--", "--force", "--dest", dest, "--archive", "--scala", scalaVersion, "--spark", sparkVersion, sourceUrl) + .call(stdin = os.Inherit, stdout = os.Inherit) + private def versions = Seq( + "2.12.15" -> "3.0.3", + "2.12.15" -> "2.4.2" + ) + def main(args: Array[String]): Unit = { + val tag = os.proc("git", "tag", "--points-at", "HEAD").call().out.trim() + val dummy = tag.isEmpty + if (dummy) + System.err.println("Not on a git tag, running in dummy mode") + val token = Option(System.getenv("UPLOAD_GH_TOKEN")).getOrElse { + if (dummy) "" + else sys.error("UPLOAD_GH_TOKEN not set") + } + val files = versions.map { + case (scalaVer, sparkVer) => + val url = s"https://archive.apache.org/dist/spark/spark-$sparkVer/spark-$sparkVer-bin-hadoop2.7.tgz" + val sbv = scalaVer.split('.').take(2).mkString(".") + val name = s"spark-$sparkVer-bin-hadoop2.7-scala$sbv" + val dest = os.temp(prefix = name, suffix = ".tgz") + create(scalaVer, sparkVer, url, dest) + dest -> s"$name.tgz" + } + if (!dummy) + io.github.alexarchambault.millnativeimage.upload.Upload.upload( + ghOrg = "scala-cli", + ghProj = "lightweight-spark-distrib", + ghToken = token, + tag = tag, + dryRun = false, + overwrite = true + )(files: _*) + } +} diff --git a/src/Convert.scala b/src/Convert.scala new file mode 100644 index 0000000..304d73e --- /dev/null +++ b/src/Convert.scala @@ -0,0 +1,278 @@ +//> using lib "com.github.alexarchambault::case-app:2.1.0-M14" +//> using lib "com.lihaoyi::os-lib:0.8.1" +//> using lib "com.lihaoyi::pprint:0.7.3" +//> using lib "io.get-coursier::coursier:2.1.0-M5-24-g678b31710" +//> using lib "io.get-coursier::dependency:0.2.2" +//> using lib "org.apache.commons:commons-compress:1.21" +//> using scala "2.13.8" + +//> using option "-Ywarn-unused" + +import caseapp.core.app.CaseApp +import caseapp.core.RemainingArgs +import coursier.cache.{ArchiveCache, FileCache} +import coursier.core.Publication +import coursier.error.ResolutionError +import coursier.util.Artifact +import coursier.{dependencyString => _, _} +import dependency._ +import Util._ + +import java.nio.charset.StandardCharsets + +import scala.collection.mutable +import scala.util.control.NonFatal +import scala.util.Properties +import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream +import scala.util.Using +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream +import org.apache.commons.compress.archivers.tar.TarArchiveEntry + +object Convert extends CaseApp[ConvertOptions] { + + // has to accept input like "hive-cli-2.3.7.jar", but also "scopt_2.12-3.7.1.jar" (Scala version suffix) + def moduleName(fileName: String): String = { + val f = fileName.stripSuffix(".jar") + val lastDashIdx = f.lastIndexOf('-') + if (lastDashIdx < 0) f + else { + val followedByVersion = lastDashIdx.until(f.length).forall { idx => + val c = f(idx) + c.isDigit || c == '.' || c == '-' + } + if (followedByVersion) + moduleName(f.take(lastDashIdx)) + else + f + } + } + + def versionFor(fileName: String): String = + fileName + .stripPrefix(moduleName(fileName)) + .stripPrefix("-") + .stripSuffix(".jar") + + def fetchJarVersion(dep: coursier.Dependency, pub: Publication, forceVersion: String): Option[String] = { + val dep0 = dep.withVersion(forceVersion).withTransitive(false).withPublication(pub) + val resOpt = + try Some(Fetch().addDependencies(dep0).runResult()(FileCache().ec)) + catch { + case e: ResolutionError.CantDownloadModule if e.perRepositoryErrors.forall(_.startsWith("not found: ")) => + None + case NonFatal(e) => + throw new Exception(e) + } + resOpt.map(_.artifacts).getOrElse(Nil) match { + case Seq() => + System.err.println(s"Error: could not get ${dep.module}:$forceVersion (no file found)") + None + case Seq((a, _)) => + Some(a.url) + case Seq((a, _), _*) => + System.err.println(s"Warning: got several files for ${dep.module}:$forceVersion (should not happen)") + Some(a.url) + } + } + + def csShUrl = "https://github.com/coursier/ci-scripts/raw/master/cs.sh" + + def run(options: ConvertOptions, args: RemainingArgs): Unit = { + + val arg = args.all match { + case Seq() => + System.err.println("Error: no argument passed (expected Spark distribution path or URL)") + System.err.println(finalHelp.usage(helpFormat)) + sys.exit(1) + case Seq(arg) => arg + case _ => + System.err.println("Error: too many arguments passed (expected one)") + System.err.println(finalHelp.usage(helpFormat)) + sys.exit(1) + } + + val distribPath = + if (arg.contains("://")) { + val cache = ArchiveCache() + val artifact = Artifact(arg).withChanging(options.changing) + cache.get(artifact).unsafeRun()(cache.cache.ec) match { + case Left(e) => throw new Exception(e) + case Right(f) => os.Path(f, os.pwd) + } + } + else + os.Path(arg, os.pwd) + + val dest = os.Path(options.dest, os.pwd) + + if (os.exists(dest)) { + if (options.force) { + System.err.println(s"${options.dest} already exists, removing it…") + os.remove.all(dest) + } + else { + System.err.println(s"${options.dest} already exists, pass --force to force removing it.") + sys.exit(1) + } + } + + val dirDest = + if (options.archive) os.temp.dir(prefix = "convert-spark-distrib") + else dest + + convert( + distribPath, + dirDest, + options.scalaVersion, + options.sparkVersion + ) + + if (options.archive) { + Using.resource(os.write.outputStream(dest, createFolders = true)) { fos => + Using.resource(new GzipCompressorOutputStream(fos)) { gzos => + Using.resource(new TarArchiveOutputStream(gzos)) { taos => + taos.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU) + for (p <- os.walk.stream(dirDest) if p != dirDest) { + val relPath = p.relativeTo(dirDest) + val ent = new TarArchiveEntry(p.toNIO, relPath.toString) + taos.putArchiveEntry(ent) + if (os.isFile(p)) + taos.write(os.read.bytes(p)) + taos.closeArchiveEntry() + } + } + } + } + } + } + + def convert( + distribPath: os.Path, + dest: os.Path, + scalaVersionOpt: Option[String], + sparkVersionOpt: Option[String] + ): Unit = { + + os.makeDir.all(dest) + + val sparkVersion = sparkVersionOpt.map(_.trim).filter(_.nonEmpty).getOrElse { + ??? + } + val scalaVersion = scalaVersionOpt.map(_.trim).filter(_.nonEmpty).getOrElse { + ??? + } + + // FIXME Add more? + // (see "cs complete-dependency org.apache.spark: | grep '_2\.12$'" + // or `ls "$(cs get https://archive.apache.org/dist/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz --archive)"/*/jars | grep '^spark-'`) + val sparkModules = Seq( + "core", + "graphx", + "hive", + "hive-thriftserver", + "kubernetes", + "mesos", + "mllib", + "repl", + "sql", + "streaming", + "yarn" + ) + + val params = ScalaParameters(scalaVersion) + val sparkDependencies = sparkModules.map { mod => + dep"org.apache.spark::spark-$mod:$sparkVersion".applyParams(params).toCs + } + val extraDependencies = Seq( + dep"com.github.scopt::scopt:3.7.1".applyParams(params).toCs + ) + val dependencies = sparkDependencies ++ extraDependencies + + System.err.println(s"Fetching Spark JARs via coursier") + val res = Fetch() + .addDependencies(dependencies: _*) + .runResult()(FileCache().ec) + res.files + System.err.println(s"Got ${res.files.length} JARs") + + val map = res + .artifacts + .map { + case (a, f) => + f.getName -> a.url + } + .toMap + + val moduleNameMap = res + .fullDetailedArtifacts + .collect { + case (d, p, a, Some(f)) => + (moduleName(f.getName), (d, p, a.url)) + } + .toMap + + val entries = new mutable.ListBuffer[(String, String)] + + for (p <- os.walk.stream(distribPath)) { + val rel = p.relativeTo(distribPath) + if (os.isDir(p)) + os.makeDir(dest / rel) + else if (rel.last.endsWith(".jar")) { + val urlOpt = map.get(rel.last).orElse { + moduleNameMap.get(moduleName(rel.last)).flatMap { + case (d, p, _) => + fetchJarVersion(d, p, versionFor(rel.last)) + } + } + + urlOpt match { + case None => + System.err.println(s"Warning: $rel (${moduleName(rel.last)}) not found") + // Is copyAttributes fine on Windows? + os.copy(p, dest / rel, copyAttributes = true) + case Some(url) => + entries += url -> rel.toString + } + } + else + // Is copyAttributes fine on Windows? + os.copy(p, dest / rel, copyAttributes = true) + } + + val csSh = FileCache().file(Artifact(csShUrl).withChanging(true)).run.unsafeRun()(FileCache().ec) match { + case Left(e) => throw new Exception(e) + case Right(f) => os.Path(f, os.pwd) + } + + val fetchJarsDir = dest / "fetch-jars" + os.makeDir.all(fetchJarsDir) + os.copy(csSh, fetchJarsDir / "cs.sh") + if (!Properties.isWin) + os.perms.set(fetchJarsDir / "cs.sh", "rwxr-xr-x") + + def fetchJarContent = + """#!/usr/bin/env bash + |set -eu + | + |cd "$(dirname "${BASH_SOURCE[0]}")" + | + |cat fetch-jars/jar-urls | while read entry; do + | url="$(echo "$entry" | sed 's/->.*$//')" + | dest="$(echo "$entry" | sed 's/^.*->//')" + | cp "$(./fetch-jars/cs.sh get "$url")" "$dest" + |done + |""".stripMargin.getBytes(StandardCharsets.UTF_8) + os.write(dest / "fetch-jars.sh", fetchJarContent, perms = if (Properties.isWin) null else "rwxr-xr-x") + + def entriesContent = + entries + .toList + .map { + case (url, dest) => + s"$url->$dest${System.lineSeparator()}" + } + .mkString + .getBytes(StandardCharsets.UTF_8) + os.write(fetchJarsDir / "jar-urls", entriesContent) + } +} diff --git a/src/ConvertOptions.scala b/src/ConvertOptions.scala new file mode 100644 index 0000000..888b903 --- /dev/null +++ b/src/ConvertOptions.scala @@ -0,0 +1,24 @@ +import caseapp._ + +// format: off +final case class ConvertOptions( + dest: String, + @HelpMessage("Whether the Spark distribution pointed by the passed URL might change in the future") + changing: Boolean = false, + @HelpMessage("Erase destination path if it already exists") + force: Boolean = false, + @HelpMessage("Create an archive rather than a Spark distribution directory") + archive: Boolean = false, + @HelpMessage("Force Spark version") + @ExtraName("spark") + sparkVersion: Option[String] = None, + @HelpMessage("Force Scala version") + @ExtraName("scala") + scalaVersion: Option[String] = None +) +// format: on + +object ConvertOptions { + implicit lazy val parser: Parser[ConvertOptions] = Parser.derive + implicit lazy val help: Help[ConvertOptions] = Help.derive +} diff --git a/src/Util.scala b/src/Util.scala new file mode 100644 index 0000000..00a7749 --- /dev/null +++ b/src/Util.scala @@ -0,0 +1,34 @@ +object Util { + + // from https://github.com/VirtusLab/scala-cli/blob/b754d2afdda114e97febfb0090773cc582bafd19/modules/options/src/main/scala/scala/build/internals/Util.scala#L33-L57 + implicit class ModuleOps(private val mod: dependency.Module) extends AnyVal { + def toCs: coursier.Module = + coursier.Module( + coursier.Organization(mod.organization), + coursier.ModuleName(mod.name), + mod.attributes + ) + } + implicit class DependencyOps(private val dep: dependency.Dependency) extends AnyVal { + def toCs: coursier.Dependency = { + val mod = dep.module.toCs + var dep0 = coursier.Dependency(mod, dep.version) + if (dep.exclude.nonEmpty) + dep0 = dep0.withExclusions { + dep.exclude.toSet[dependency.Module].map { mod => + (coursier.Organization(mod.organization), coursier.ModuleName(mod.name)) + } + } + for (clOpt <- dep.userParams.get("classifier"); cl <- clOpt) + dep0 = dep0.withPublication(dep0.publication.withClassifier(coursier.core.Classifier(cl))) + for (tpeOpt <- dep.userParams.get("type"); tpe <- tpeOpt) + dep0 = dep0.withPublication(dep0.publication.withType(coursier.core.Type(tpe))) + for (extOpt <- dep.userParams.get("ext"); ext <- extOpt) + dep0 = dep0.withPublication(dep0.publication.withExt(coursier.core.Extension(ext))) + for (_ <- dep.userParams.get("intransitive")) + dep0 = dep0.withTransitive(false) + dep0 + } + } + +} \ No newline at end of file diff --git a/src/test/ConvertTests.scala b/src/test/ConvertTests.scala new file mode 100644 index 0000000..700ab09 --- /dev/null +++ b/src/test/ConvertTests.scala @@ -0,0 +1,15 @@ +//> using lib "com.lihaoyi::utest::0.7.11" + +import utest._ + +object ConvertTests extends TestSuite { + + val tests = Tests { + + test("simple") { + // TODO + } + + } + +}