Skip to content

Commit

Permalink
feat: datasets compacting added (#8)
Browse files Browse the repository at this point in the history
* feat: a new compact-jena command

* feat: the compacting tasks to be able to run periodically according to the given schedule

* feat: changes to the helm chart; admin pass to be read from shiro.ini

* chore: GH action that compiles the compactor
  • Loading branch information
jachro authored Nov 23, 2023
1 parent 9216b08 commit 3954a62
Show file tree
Hide file tree
Showing 16 changed files with 317 additions and 5 deletions.
15 changes: 14 additions & 1 deletion .github/workflows/publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,23 @@ jobs:
- uses: rokroskar/[email protected]
env:
GITHUB_TOKEN: "${{ secrets.RENKUBOT_GITHUB_TOKEN }}"
compile-compactor:
name: Compile Compactor tool
runs-on: ubuntu-latest
needs: [ cleanup-previous-runs ]
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
- uses: coursier/[email protected]
- uses: VirtusLab/[email protected]
with:
jvm: temurin:21
- run: scala-cli compile compactor/src/* compactor/Compactor.scala
publish:
name: Publish
runs-on: ubuntu-latest
needs: [ cleanup-previous-runs ]
needs: [ compile-compactor ]
env:
GIT_USER: Renku Bot
GIT_EMAIL: [email protected]
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ project/plugins/project/
.cache
.lib/
.bsp/
.scala-build/


### Scala ###
Expand Down
24 changes: 21 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ RUN \
--output "${JAVA_MINIMAL}" \
--add-modules "${JDEPS},${JDEPS_EXTRA}"

COPY /compactor/src/*.scala /compactor/src/
COPY /compactor/Compactor.scala /compactor/

RUN \
apk add --no-cache wget coreutils && \
wget -q -O scala-cli.gz https://github.com/Virtuslab/scala-cli/releases/latest/download/scala-cli-x86_64-pc-linux-static.gz && gunzip scala-cli.gz && \
chmod +x scala-cli && \
mv scala-cli /usr/bin/ && \
scala-cli --power package /compactor/src /compactor/Compactor.scala -o /compactor/compact-jena --assembly

## -- Copying entrypoint.sh
COPY entrypoint.sh /

Expand All @@ -89,17 +99,21 @@ ARG FUSEKI_HOME
ARG FUSEKI_BASE
ARG JAVA_MINIMAL
ARG JVM_ARGS
ARG ADMIN_USER
ARG SHIRO_INI_LOCATION
ARG COMPACTING_SCHEDULE

RUN apk add --no-cache curl tini
RUN apk add --no-cache curl tini bash

COPY --from=build-stage /opt/java-minimal /opt/java-minimal
COPY --from=build-stage $FUSEKI_HOME $FUSEKI_HOME
COPY --from=build-stage /etc/passwd /etc/passwd
COPY --from=build-stage /entrypoint.sh /entrypoint.sh
COPY --from=build-stage /compactor/compact-jena /usr/bin/compact-jena

WORKDIR ${FUSEKI_HOME}

# Creating 'fuseki' system user to be used for starting the service
## Creating 'fuseki' system user to be used for starting the service
# -D : no password
ENV GID=1000
RUN adduser --disabled-password -g "$GID" -D -u 1000 -s /bin/sh -h ${FUSEKI_HOME} fuseki
Expand All @@ -121,13 +135,17 @@ ENV \
FUSEKI_HOME="${FUSEKI_HOME}" \
FUSEKI_BASE="${FUSEKI_BASE}" \
LOGGING="-Dlog4j.configurationFile=${FUSEKI_BASE}/log4j2.properties" \
PATH="${JAVA_MINIMAL}/bin:${FUSEKI_HOME}/bin:${PATH}"
PATH="${JAVA_MINIMAL}/bin:${FUSEKI_HOME}/bin:${PATH}" \
ADMIN_USER="${ADMIN_USER}" \
SHIRO_INI_LOCATION="${SHIRO_INI_LOCATION}" \
COMPACTING_SCHEDULE="${COMPACTING_SCHEDULE}"

USER fuseki

RUN \
rm -rf ${FUSEKI_BASE}/configuration/* && \
rm -rf ${FUSEKI_BASE}/databases/*

VOLUME ${FUSEKI_BASE}

EXPOSE 3030
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ docker build --force-rm --build-arg JENA_VERSION=4.10.0 -t fuseki .

It's possible to build the image for `linux/amd64` and `linux/arm64` platforms.

In the case the image will be run on a `linux/amd64` platform but building happens on a platform with different architecture,
the docker build command could look like follows:
```
docker buildx build --platform linux/amd64 --force-rm --build-arg JENA_VERSION=4.10.0 -t fuseki .
```

## Running docker image

There are images built for `linux/amd64` and `linux/arm64` available on dockerhub. For available versions check [renku/renku-jena page](https://hub.docker.com/r/renku/renku-jena/tags).
Expand All @@ -40,6 +46,8 @@ There's a set of values that have to be configured in order to make Fuseki worki
* `persistence.size` - size of the persistence volume; defaulted to 1Gi;
* `additionalEnvironmentVariables.JVM_ARGS` - flags to be passed to the JVM in the server startup command; it's common to pass some memory related settings to JVM, e.g. `-Xmx2048m -Xms2048m`.
* `requests.memory` - amount of memory requested for the container; should be higher than 'Xmx' if set on `additionalEnvironmentVariables.JVM_ARGS`;
* `compacting.ADMIN_USER` - username of an admin user who has access to Fuseki Admin API, e.g. `admin` if the default `shiro.ini` is used.
* `compacting.COMPACTING_SCHEDULE` - an expression defining schedule of compacting execution, e.g. `Mon *-*-* 00:00:00` meaning every Monday at 00:00:00. More details can be found at https://github.com/eikek/calev.

By default, the chart creates a Persistent Volume and mounts it at `$FUSEKI_BASE`. This folder is used by Fuseki for storing both the configuration and the data. It's crucial for `$FUSEKI_BASE` to be on a persistent volume, otherwise the configuration and the data will be lost on server shutdown. As mentioned above, it's recommended not to modify the mounting point but simply mount a volume to `/fuseki`.

Expand Down
26 changes: 26 additions & 0 deletions compactor/.scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
version = "3.7.15"

runner.dialect = "scala3"

maxColumn = 120
assumeStandardLibraryStripMargin = true
style = defaultWithAlign
lineEndings = unix
importSelectors = singleLine

docstrings.style = keep

project.git = true

align = most
align.tokens = [{code = "=>", owner = "Case|Type.Arg.ByName"}, "extends", ":", "=", "<-", "->", "-->", "%", "%%", "should", "shouldBe", "shouldEqual", "shouldNot", "asPrefix"]
align.arrowEnumeratorGenerator = true
align.openParenCallSite = true
align.openParenDefnSite = true

rewrite {
rules = [RedundantBraces, RedundantParens, SortModifiers, PreferCurlyFors]
sortModifiers.order = [
"private", "protected", "final", "implicit", "sealed", "abstract", "override", "lazy"
]
}
62 changes: 62 additions & 0 deletions compactor/Compactor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//> using scala 3
//> using dep eu.timepit::fs2-cron-calev::0.8.3
//> using dep org.apache.shiro:shiro-core:1.13.0
//> using dep org.http4s::http4s-circe::0.23.24
//> using dep org.http4s::http4s-dsl::0.23.24
//> using dep org.http4s::http4s-ember-client::0.23.24
//> using dep org.slf4j:slf4j-simple:2.0.9
//> using dep org.typelevel::cats-core::2.10.0
//> using resourceDir src

import cats.effect.{ExitCode, IO, IOApp, Temporal}
import cats.syntax.all.*
import com.github.eikek.calev.CalEvent
import eu.timepit.fs2cron.calev.CalevScheduler
import fs2.Stream
import org.typelevel.log4cats.Logger

import java.time.Duration as JDuration
import scala.concurrent.duration.*

object Compactor extends IOApp with Logging:

def run(args: List[String]): IO[ExitCode] =
Config.readConfig
.fold(
err => Logger[IO].warn(s"Compacting skipped/failed: $err"),
config => schedule(compactDatasets(using config), config.schedule)
)
.as(ExitCode.Success)

private def schedule(task: IO[Unit], schedule: CalEvent) =
val scheduler = CalevScheduler.systemDefault[IO]
(scheduler.awakeEvery(schedule) >> Stream.eval(task)).compile.drain

private def compactDatasets(using config: Config) =
for {
datasets <- DatasetsFinder.findDatasets
_ <- datasets.map(compact).sequence
_ <- Logger[IO].info("Compacting finished.")
} yield ()

private def compact(dataset: String)(using config: Config) =
CompactionInitiator.kickOffCompaction(dataset) >>= {
case Left(err) =>
Logger[IO].error(s"Compacting '$dataset' failed; $err")
case Right(taskId) =>
Logger[IO].info(s"Compacting '$dataset' started; taskId = $taskId") >>
waitToFinish(taskId, dataset)
}

private def waitToFinish(taskId: TaskId, dataset: String)(using config: Config): IO[Unit] =
TaskStatusFinder.hasFinished(taskId) >>= {
case Left(err) => Logger[IO].error(err)
case Right(None) => Temporal[IO].delayBy(waitToFinish(taskId, dataset), 2.seconds)
case Right(Some(duration)) => Logger[IO].info(s"Compacting '$dataset' done in ${makeReadable(duration)}")
}

private lazy val makeReadable: JDuration => String = {
case duration if duration.toMillis < 3 * 60 => s"${duration.toMillis}ms"
case duration if duration.toSeconds < 3 * 60 => s"${duration.toSeconds}s"
case duration => s"${duration.toMinutes}min"
}
28 changes: 28 additions & 0 deletions compactor/src/Admin.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import cats.syntax.all.*
import org.apache.shiro.config.Ini
import org.http4s.BasicCredentials
import org.http4s.headers.Authorization

final case class Admin(username: String, password: String):
lazy val asAuthHeader = Authorization(BasicCredentials(username, password))

object Admin:

def readConfig: Either[String, Admin] =
sys.env
.get(Config.adminUserEnv)
.toRight(s"No '${Config.adminUserEnv}' environment variable given")
.flatMap(username => readAdminPass(username).map(Admin.apply(username, _)))

private def readAdminPass(adminUserName: String): Either[String, String] =
val shiroFileName = "shiro.ini"
sys.env
.get(Config.shiroIniLocationEnv)
.toRight(s"'${Config.shiroIniLocationEnv}' environment variable given")
.flatMap(path =>
Either
.catchNonFatal(Ini.fromResourcePath(path))
.leftMap(err => s"$shiroFileName couldn't be read from '$path': ${err.getMessage}")
)
.flatMap(ini => Option(ini.getSection("users")).toRight(s"No 'users' section in $shiroFileName"))
.flatMap(users => Option(users.get(adminUserName)).toRight(s"No 'adminUserName' in $shiroFileName"))
35 changes: 35 additions & 0 deletions compactor/src/CompactionInitiator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import cats.effect.IO
import cats.syntax.all.*
import io.circe.Decoder
import org.http4s.Method.POST
import org.http4s.Status.Ok
import org.http4s.circe.CirceEntityDecoder.*
import org.http4s.{EntityDecoder, Headers, MediaType, Request, Response}

object CompactionInitiator:

def kickOffCompaction(ds: String)(using config: Config): IO[Either[String, TaskId]] =
HttpClient().use {
_.run {
Request[IO](POST,
(config.adminApi / "compact" / ds).withQueryParam("deleteOld", true),
headers = Headers(config.admin.asAuthHeader)
)
}.use(toTaskIdOrError)
}

private lazy val toTaskIdOrError: Response[IO] => IO[Either[String, TaskId]] = {
case resp
if resp.status == Ok &&
resp.contentType.exists(_.mediaType == MediaType.application.json) =>
resp.as[TaskId].map(_.asRight)
case resp =>
import EntityDecoder.text
resp.as[String].map(_.trim.asLeft)
}

private given Decoder[TaskId] =
_.downField("taskId").as[String].map(TaskId.apply)

final case class TaskId(value: String):
override lazy val toString: String = value
23 changes: 23 additions & 0 deletions compactor/src/Config.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import cats.syntax.all.*
import com.github.eikek.calev.CalEvent
import org.http4s.Uri
import org.http4s.implicits.*

final case class Config(adminApi: Uri, admin: Admin, schedule: CalEvent)

object Config:

private val compactingScheduleEnv = "COMPACTING_SCHEDULE"
val shiroIniLocationEnv = "SHIRO_INI_LOCATION"
val adminUserEnv = "ADMIN_USER"
val adminUri = uri"http://localhost:3030/$$"

def readConfig: Either[String, Config] =
(Admin.readConfig -> readCompactingSchedule)
.mapN(Config(adminUri, _, _))

private lazy val readCompactingSchedule =
sys.env
.get(compactingScheduleEnv)
.toRight(s"'$compactingScheduleEnv' environment variable given")
.flatMap(CalEvent.parse)
19 changes: 19 additions & 0 deletions compactor/src/DatasetsFinder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import cats.effect.IO
import io.circe.Decoder
import io.circe.Decoder.decodeList
import org.http4s.Method.GET
import org.http4s.circe.CirceEntityDecoder.*
import org.http4s.{Headers, Request}

object DatasetsFinder:

def findDatasets(using config: Config): IO[List[String]] =
HttpClient().use {
_.run {
Request[IO](GET, config.adminApi / "datasets", headers = Headers(config.admin.asAuthHeader))
}.use(_.as[List[String]])
}

private given Decoder[List[String]] =
val dsName: Decoder[String] = _.downField("ds.name").as[String]
_.downField("datasets").as(decodeList(dsName)).map(_.map(_.substring(1)))
15 changes: 15 additions & 0 deletions compactor/src/HttpClient.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import cats.effect.{IO, Resource}
import org.http4s.client.Client
import org.http4s.ember.client.EmberClientBuilder

import scala.concurrent.duration.Duration

object HttpClient:

val resource: Resource[IO, Client[IO]] =
EmberClientBuilder
.default[IO]
.withIdleConnectionTime(Duration.Inf)
.build

def apply(): Resource[IO, Client[IO]] = resource
14 changes: 14 additions & 0 deletions compactor/src/Logging.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import cats.effect.IO
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

trait Logging:
Logging.configure
given Logger[IO] = Slf4jLogger.getLoggerFromName[IO]("Compactor")

object Logging:
val configure: Unit =
System.setProperty("org.slf4j.simpleLogger.showDateTime", "true")
System.setProperty("org.slf4j.simpleLogger.dateTimeFormat", "[yyyy-MM-dd HH:mm:ss]")
System.setProperty("org.slf4j.simpleLogger.showThreadName", "false")
System.setProperty("org.slf4j.simpleLogger.showLogName", "true")
37 changes: 37 additions & 0 deletions compactor/src/TaskStatusFinder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import cats.effect.IO
import cats.syntax.all.*
import io.circe.Decoder
import org.http4s.Method.GET
import org.http4s.Status.Ok
import org.http4s.circe.CirceEntityDecoder.*
import org.http4s.{Headers, Request, Response}

import java.time.{Duration, Instant}

object TaskStatusFinder:

def hasFinished(taskId: TaskId)(using config: Config): IO[Either[String, Option[Duration]]] =
HttpClient().use {
_.run {
Request[IO](GET, config.adminApi / "tasks" / taskId.value, headers = Headers(config.admin.asAuthHeader))
}.use(toDuration(taskId))
}

private def toDuration(taskId: TaskId): Response[IO] => IO[Either[String, Option[Duration]]] = {
case resp if resp.status == Ok =>
given Decoder[Either[String, Option[Duration]]] = decoder(taskId)
resp.as[Either[String, Option[Duration]]]
case resp =>
resp.as[String].map(_.trim.asLeft)
}

private def decoder(taskId: TaskId): Decoder[Either[String, Option[Duration]]] = cur =>
(
cur.downField("started").as[Option[Instant]],
cur.downField("finished").as[Option[Instant]],
cur.downField("success").as[Option[Boolean]]
).mapN {
case (None, _, _) => s"Task = $taskId not found".asLeft
case (Some(started), _, Some(false)) => s"Task = $taskId failed".asLeft
case (Some(started), maybeFinished, _) => maybeFinished.map(Duration.between(started, _)).asRight
}
2 changes: 1 addition & 1 deletion entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ if [[ ! -n "$FUSEKI_COMMAND" ]] ; then
export FUSEKI_COMMAND="--conf=${FUSEKI_BASE}/config.ttl"
fi

exec "${FUSEKI_HOME}/fuseki-server" "$FUSEKI_COMMAND"
exec /usr/bin/compact-jena & "${FUSEKI_HOME}/fuseki-server" "$FUSEKI_COMMAND"
Loading

0 comments on commit 3954a62

Please sign in to comment.