diff --git a/desktop/src/app/camera/camera.component.ts b/desktop/src/app/camera/camera.component.ts
index b499bc21c..6638ffa5f 100644
--- a/desktop/src/app/camera/camera.component.ts
+++ b/desktop/src/app/camera/camera.component.ts
@@ -179,7 +179,6 @@ export class CameraComponent implements AfterContentInit, OnDestroy {
}
readonly wait = {
- duration: 0,
remainingTime: 0,
progress: 0,
}
@@ -238,16 +237,10 @@ export class CameraComponent implements AfterContentInit, OnDestroy {
}
})
- electron.on('CAMERA_CAPTURE_STARTED', event => {
- if (event.camera.name === this.camera?.name) {
+ electron.on('CAMERA_DETACHED', event => {
+ if (event.device.name === this.camera?.name) {
ngZone.run(() => {
- this.capture.looping = event.looping
- this.capture.amount = event.exposureAmount
- this.capture.elapsedTime = 0
- this.capture.remainingTime = event.estimatedTime
- this.capture.progress = event.progress
- this.capturing = true
- this.waiting = false
+ this.connected = false
})
}
})
@@ -255,61 +248,27 @@ export class CameraComponent implements AfterContentInit, OnDestroy {
electron.on('CAMERA_CAPTURE_ELAPSED', event => {
if (event.camera.name === this.camera?.name) {
ngZone.run(() => {
- this.capture.elapsedTime = event.elapsedTime
- this.capture.remainingTime = event.remainingTime
- this.capture.progress = event.progress
- })
- }
- })
-
- electron.on('CAMERA_CAPTURE_WAITING', event => {
- if (event.camera.name === this.camera?.name) {
- ngZone.run(() => {
- this.wait.duration = event.waitDuration
- this.wait.remainingTime = event.remainingTime
- this.wait.progress = event.progress
- this.capturing = false
- this.waiting = true
- })
- }
- })
-
- electron.on('CAMERA_CAPTURE_FINISHED', event => {
- if (event.camera.name === this.camera?.name) {
- ngZone.run(() => {
- this.capturing = false
- this.waiting = false
- })
- }
- })
-
- electron.on('CAMERA_EXPOSURE_STARTED', event => {
- if (event.camera.name === this.camera?.name) {
- ngZone.run(() => {
- this.exposure.remainingTime = event.remainingTime
- this.exposure.progress = event.progress
- this.exposure.count = event.exposureCount
- this.capturing = true
- this.waiting = false
- })
- }
- })
-
- electron.on('CAMERA_EXPOSURE_ELAPSED', event => {
- if (event.camera.name === this.camera?.name) {
- ngZone.run(() => {
- this.exposure.remainingTime = event.remainingTime
- this.exposure.progress = event.progress
+ this.capture.elapsedTime = event.captureElapsedTime
+ this.capture.remainingTime = event.captureRemainingTime
+ this.capture.progress = event.captureProgress
+ this.exposure.remainingTime = event.exposureRemainingTime
+ this.exposure.progress = event.exposureProgress
this.exposure.count = event.exposureCount
- })
- }
- })
- electron.on('CAMERA_EXPOSURE_FINISHED', event => {
- if (event.camera.name === this.camera?.name) {
- ngZone.run(() => {
- this.exposure.remainingTime = event.remainingTime
- this.exposure.progress = event.progress
+ if (event.state === 'WAITING') {
+ this.wait.remainingTime = event.waitRemainingTime
+ this.wait.progress = event.waitProgress
+ this.waiting = true
+ } else if (event.state === 'CAPTURE_STARTED') {
+ this.capture.looping = event.exposureAmount <= 0
+ this.capture.amount = event.exposureAmount
+ this.capturing = true
+ } else if (event.state === 'CAPTURE_FINISHED') {
+ this.capturing = false
+ this.waiting = false
+ } else if (event.state === 'EXPOSURE_STARTED') {
+ this.waiting = false
+ }
})
}
})
@@ -336,7 +295,7 @@ export class CameraComponent implements AfterContentInit, OnDestroy {
const camera = await this.api.camera(this.camera.name)
Object.assign(this.camera, camera)
- await this.loadPreference()
+ this.loadPreference()
this.update()
} else {
this.app.subTitle = ''
@@ -422,18 +381,20 @@ export class CameraComponent implements AfterContentInit, OnDestroy {
}
private updateExposureUnit(unit: ExposureTimeUnit) {
- const a = CameraComponent.exposureUnitFactor(this.exposureTimeUnit)
- const b = CameraComponent.exposureUnitFactor(unit)
- const exposureTime = Math.trunc(this.exposureTime * b / a)
- const exposureTimeMin = Math.trunc(this.camera!.exposureMin * b / 60000000)
- const exposureTimeMax = Math.trunc(this.camera!.exposureMax * b / 60000000)
- this.exposureTimeMax = Math.max(1, exposureTimeMax)
- this.exposureTimeMin = Math.max(1, exposureTimeMin)
- this.exposureTime = Math.max(this.exposureTimeMin, Math.min(exposureTime, this.exposureTimeMax))
- this.exposureTimeUnit = unit
+ if (this.camera!.exposureMax) {
+ const a = CameraComponent.exposureUnitFactor(this.exposureTimeUnit)
+ const b = CameraComponent.exposureUnitFactor(unit)
+ const exposureTime = Math.trunc(this.exposureTime * b / a)
+ const exposureTimeMin = Math.trunc(this.camera!.exposureMin * b / 60000000)
+ const exposureTimeMax = Math.trunc(this.camera!.exposureMax * b / 60000000)
+ this.exposureTimeMax = Math.max(1, exposureTimeMax)
+ this.exposureTimeMin = Math.max(1, exposureTimeMin)
+ this.exposureTime = Math.max(this.exposureTimeMin, Math.min(exposureTime, this.exposureTimeMax))
+ this.exposureTimeUnit = unit
+ }
}
- private async update() {
+ private update() {
if (this.camera) {
this.connected = this.camera.connected
diff --git a/desktop/src/app/filterwheel/filterwheel.component.ts b/desktop/src/app/filterwheel/filterwheel.component.ts
index 1ec686822..c49045abd 100644
--- a/desktop/src/app/filterwheel/filterwheel.component.ts
+++ b/desktop/src/app/filterwheel/filterwheel.component.ts
@@ -64,6 +64,14 @@ export class FilterWheelComponent implements AfterContentInit, OnDestroy {
}
})
+ electron.on('WHEEL_DETACHED', event => {
+ if (event.device.name === this.wheel?.name) {
+ ngZone.run(() => {
+ this.connected = false
+ })
+ }
+ })
+
this.subscription = this.filterChangedPublisher
.pipe(throttleTime(1500))
.subscribe((filter) => {
@@ -127,7 +135,7 @@ export class FilterWheelComponent implements AfterContentInit, OnDestroy {
this.filterChangedPublisher.next(filter)
}
- private async update() {
+ private update() {
if (!this.wheel) {
return
}
diff --git a/desktop/src/app/focuser/focuser.component.ts b/desktop/src/app/focuser/focuser.component.ts
index 666b8fd6a..0f59124ce 100644
--- a/desktop/src/app/focuser/focuser.component.ts
+++ b/desktop/src/app/focuser/focuser.component.ts
@@ -55,6 +55,14 @@ export class FocuserComponent implements AfterViewInit, OnDestroy {
})
}
})
+
+ electron.on('FOCUSER_DETACHED', event => {
+ if (event.device.name === this.focuser?.name) {
+ ngZone.run(() => {
+ this.connected = false
+ })
+ }
+ })
}
async ngAfterViewInit() {
@@ -120,7 +128,7 @@ export class FocuserComponent implements AfterViewInit, OnDestroy {
this.api.focuserAbort(this.focuser!)
}
- private async update() {
+ private update() {
if (!this.focuser) {
return
}
diff --git a/desktop/src/app/home/home.component.ts b/desktop/src/app/home/home.component.ts
index 4246317df..67f3d3aaa 100644
--- a/desktop/src/app/home/home.component.ts
+++ b/desktop/src/app/home/home.component.ts
@@ -317,7 +317,9 @@ export class HomeComponent implements AfterContentInit, OnDestroy {
this.connected = await this.api.connectionStatus()
} catch {
this.connected = false
+ }
+ if (!this.connected) {
this.cameras = []
this.mounts = []
this.focusers = []
diff --git a/desktop/src/app/image/image.component.ts b/desktop/src/app/image/image.component.ts
index 95649657b..db7c88d8a 100644
--- a/desktop/src/app/image/image.component.ts
+++ b/desktop/src/app/image/image.component.ts
@@ -357,8 +357,8 @@ export class ImageComponent implements AfterViewInit, OnDestroy {
) {
app.title = 'Image'
- electron.on('CAMERA_EXPOSURE_FINISHED', async (event) => {
- if (event.camera.name === this.imageData.camera?.name) {
+ electron.on('CAMERA_CAPTURE_ELAPSED', async (event) => {
+ if (event.state === 'EXPOSURE_FINISHED' && event.camera.name === this.imageData.camera?.name) {
await this.closeImage()
ngZone.run(() => {
diff --git a/desktop/src/app/mount/mount.component.ts b/desktop/src/app/mount/mount.component.ts
index d89e8c1ee..fa4b049da 100644
--- a/desktop/src/app/mount/mount.component.ts
+++ b/desktop/src/app/mount/mount.component.ts
@@ -165,6 +165,14 @@ export class MountComponent implements AfterContentInit, OnDestroy {
}
})
+ electron.on('MOUNT_DETACHED', event => {
+ if (event.device.name === this.mount?.name) {
+ ngZone.run(() => {
+ this.connected = false
+ })
+ }
+ })
+
this.computeCoordinateSubscriptions[0] = this.computeCoordinatePublisher
.pipe(throttleTime(5000))
.subscribe(() => this.computeCoordinates())
@@ -322,7 +330,7 @@ export class MountComponent implements AfterContentInit, OnDestroy {
}
}
- private async update() {
+ private update() {
if (this.mount) {
this.connected = this.mount.connected
this.slewing = this.mount.slewing
diff --git a/desktop/src/shared/services/electron.service.ts b/desktop/src/shared/services/electron.service.ts
index 91a90c5a0..898781b3d 100644
--- a/desktop/src/shared/services/electron.service.ts
+++ b/desktop/src/shared/services/electron.service.ts
@@ -7,10 +7,9 @@ import * as childProcess from 'child_process'
import { ipcRenderer, webFrame } from 'electron'
import * as fs from 'fs'
import {
- ApiEventType, Camera, CameraCaptureElapsed, CameraCaptureFinished, CameraCaptureIsWaiting, CameraCaptureStarted,
- CameraExposureElapsed, CameraExposureFinished, CameraExposureStarted, DARVPolarAlignmentEvent, DARVPolarAlignmentGuidePulseElapsed,
- DARVPolarAlignmentInitialPauseElapsed, DeviceMessageEvent, FilterWheel, Focuser, GuideOutput, Guider,
- GuiderMessageEvent, HistoryStep, INDIMessageEvent, InternalEventType, Location, Mount, NotificationEvent, NotificationEventType, OpenDirectory, OpenFile
+ ApiEventType, Camera, CameraCaptureEvent, DARVEvent, DeviceMessageEvent, FilterWheel, Focuser,
+ GuideOutput, Guider, GuiderMessageEvent, HistoryStep, INDIMessageEvent, InternalEventType, Location, Mount, NotificationEvent,
+ NotificationEventType, OpenDirectory, OpenFile
} from '../types'
import { ApiService } from './api.service'
@@ -21,13 +20,7 @@ type EventMappedType = {
'CAMERA_UPDATED': DeviceMessageEvent
'CAMERA_ATTACHED': DeviceMessageEvent
'CAMERA_DETACHED': DeviceMessageEvent
- 'CAMERA_CAPTURE_STARTED': CameraCaptureStarted
- 'CAMERA_CAPTURE_FINISHED': CameraCaptureFinished
- 'CAMERA_CAPTURE_ELAPSED': CameraCaptureElapsed
- 'CAMERA_CAPTURE_WAITING': CameraCaptureIsWaiting
- 'CAMERA_EXPOSURE_ELAPSED': CameraExposureElapsed
- 'CAMERA_EXPOSURE_STARTED': CameraExposureStarted
- 'CAMERA_EXPOSURE_FINISHED': CameraExposureFinished
+ 'CAMERA_CAPTURE_ELAPSED': CameraCaptureEvent
'MOUNT_UPDATED': DeviceMessageEvent
'MOUNT_ATTACHED': DeviceMessageEvent
'MOUNT_DETACHED': DeviceMessageEvent
@@ -45,9 +38,7 @@ type EventMappedType = {
'GUIDER_UPDATED': GuiderMessageEvent
'GUIDER_STEPPED': GuiderMessageEvent
'GUIDER_MESSAGE_RECEIVED': GuiderMessageEvent
- 'DARV_POLAR_ALIGNMENT_STARTED': DARVPolarAlignmentEvent
- 'DARV_POLAR_ALIGNMENT_FINISHED': DARVPolarAlignmentEvent
- 'DARV_POLAR_ALIGNMENT_UPDATED': DARVPolarAlignmentInitialPauseElapsed | DARVPolarAlignmentGuidePulseElapsed
+ 'DARV_POLAR_ALIGNMENT_ELAPSED': DARVEvent
'DATA_CHANGED': any
'LOCATION_CHANGED': Location
'SKY_ATLAS_UPDATE_FINISHED': NotificationEvent
diff --git a/desktop/src/shared/types.ts b/desktop/src/shared/types.ts
index d98b65597..3a8506e1e 100644
--- a/desktop/src/shared/types.ts
+++ b/desktop/src/shared/types.ts
@@ -213,41 +213,16 @@ export interface CameraStartCapture {
export interface CameraCaptureEvent extends MessageEvent {
camera: Camera
- progress: number
-}
-
-export interface CameraCaptureStarted extends CameraCaptureEvent {
- looping: boolean
- exposureAmount: number
- exposureTime: number
- estimatedTime: number
-}
-
-export interface CameraCaptureFinished extends CameraCaptureEvent { }
-
-export interface CameraCaptureElapsed extends CameraCaptureEvent {
- exposureCount: number
- remainingTime: number
- elapsedTime: number
-}
-
-export interface CameraCaptureIsWaiting extends CameraCaptureEvent {
- waitDuration: number
- remainingTime: number
-}
-
-export interface CameraExposureEvent extends CameraCaptureEvent {
+ state: CameraCaptureState
exposureAmount: number
exposureCount: number
- exposureTime: number
- remainingTime: number
-}
-
-export interface CameraExposureStarted extends CameraExposureEvent { }
-
-export interface CameraExposureElapsed extends CameraExposureEvent { }
-
-export interface CameraExposureFinished extends CameraExposureEvent {
+ captureElapsedTime: number
+ captureProgress: number
+ captureRemainingTime: number
+ exposureProgress: number
+ exposureRemainingTime: number
+ waitRemainingTime: number
+ waitProgress: number
savePath?: string
}
@@ -487,22 +462,13 @@ export interface Satellite {
groups: SatelliteGroupType[]
}
-export interface DARVPolarAlignmentEvent extends MessageEvent {
+export interface DARVEvent extends MessageEvent {
camera: Camera
guideOutput: GuideOutput
remainingTime: number
progress: number
- state: DARVPolarAlignmentState
-}
-
-export interface DARVPolarAlignmentInitialPauseElapsed extends DARVPolarAlignmentEvent {
- pauseTime: number
- state: 'INITIAL_PAUSE'
-}
-
-export interface DARVPolarAlignmentGuidePulseElapsed extends DARVPolarAlignmentEvent {
- direction: GuideDirection
- state: 'FORWARD' | 'BACKWARD'
+ state: DARVState
+ direction?: GuideDirection
}
export interface CoordinateInterpolation {
@@ -783,7 +749,7 @@ export const API_EVENT_TYPES = [
'GUIDER_CONNECTED', 'GUIDER_DISCONNECTED', 'GUIDER_UPDATED', 'GUIDER_STEPPED',
'GUIDER_MESSAGE_RECEIVED',
// Polar Alignment.
- 'DARV_POLAR_ALIGNMENT_STARTED', 'DARV_POLAR_ALIGNMENT_FINISHED', 'DARV_POLAR_ALIGNMENT_UPDATED',
+ 'DARV_POLAR_ALIGNMENT_ELAPSED',
] as const
export type ApiEventType = (typeof API_EVENT_TYPES)[number]
@@ -802,7 +768,9 @@ export const NOTIFICATION_EVENT_TYPE = [
export type NotificationEventType = (typeof NOTIFICATION_EVENT_TYPE)[number]
-export type ImageSource = 'FRAMING' | 'PATH' | 'CAMERA'
+export type ImageSource = 'FRAMING' |
+ 'PATH' |
+ 'CAMERA'
export const HIPS_SURVEY_TYPES = [
'CDS_P_DSS2_NIR',
@@ -835,11 +803,18 @@ export const HIPS_SURVEY_TYPES = [
export type HipsSurveyType = (typeof HIPS_SURVEY_TYPES)[number]
-export type PierSide = 'EAST' | 'WEST' | 'NEITHER'
+export type PierSide = 'EAST' |
+ 'WEST' |
+ 'NEITHER'
-export type TargetCoordinateType = 'J2000' | 'JNOW'
+export type TargetCoordinateType = 'J2000' |
+ 'JNOW'
-export type TrackMode = 'SIDEREAL' | ' LUNAR' | 'SOLAR' | 'KING' | 'CUSTOM'
+export type TrackMode = 'SIDEREAL' |
+ ' LUNAR' |
+ 'SOLAR' |
+ 'KING' |
+ 'CUSTOM'
export type GuideDirection = 'NORTH' | // DEC+
'SOUTH' | // DEC-
@@ -888,10 +863,24 @@ export const GUIDE_STATES = [
export type GuideState = (typeof GUIDE_STATES)[number]
-export type Hemisphere = 'NORTHERN' | 'SOUTHERN'
+export type Hemisphere = 'NORTHERN' |
+ 'SOUTHERN'
+
+export type DARVState = 'IDLE' |
+ 'INITIAL_PAUSE' |
+ 'FORWARD' |
+ 'BACKWARD'
-export type DARVPolarAlignmentState = 'IDLE' | 'INITIAL_PAUSE' | 'FORWARD' | 'BACKWARD'
+export type GuiderPlotMode = 'RA/DEC' |
+ 'DX/DY'
-export type GuiderPlotMode = 'RA/DEC' | 'DX/DY'
+export type GuiderYAxisUnit = 'ARCSEC' |
+ 'PIXEL'
-export type GuiderYAxisUnit = 'ARCSEC' | 'PIXEL'
+export type CameraCaptureState = 'CAPTURE_STARTED' |
+ 'EXPOSURE_STARTED' |
+ 'EXPOSURING' |
+ 'WAITING' |
+ 'SETTLING' |
+ 'EXPOSURE_FINISHED' |
+ 'CAPTURE_FINISHED'
diff --git a/nebulosa-batch-processing/build.gradle.kts b/nebulosa-batch-processing/build.gradle.kts
new file mode 100644
index 000000000..73f4f81c5
--- /dev/null
+++ b/nebulosa-batch-processing/build.gradle.kts
@@ -0,0 +1,18 @@
+plugins {
+ kotlin("jvm")
+ id("maven-publish")
+}
+
+dependencies {
+ api(libs.rx)
+ implementation(project(":nebulosa-log"))
+ testImplementation(project(":nebulosa-test"))
+}
+
+publishing {
+ publications {
+ create("pluginMaven") {
+ from(components["java"])
+ }
+ }
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/AsyncJobLauncher.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/AsyncJobLauncher.kt
new file mode 100644
index 000000000..221b17265
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/AsyncJobLauncher.kt
@@ -0,0 +1,141 @@
+package nebulosa.batch.processing
+
+import nebulosa.log.loggerFor
+import java.time.LocalDateTime
+import java.util.concurrent.Executor
+
+open class AsyncJobLauncher(private val executor: Executor) : JobLauncher, StepInterceptor {
+
+ private val jobListeners = LinkedHashSet()
+ private val stepListeners = LinkedHashSet()
+ private val stepInterceptors = LinkedHashSet()
+ private val jobs = LinkedHashMap()
+
+ override var stepHandler: StepHandler = DefaultStepHandler
+
+ override fun registerJobExecutionListener(listener: JobExecutionListener): Boolean {
+ return jobListeners.add(listener)
+ }
+
+ override fun unregisterJobExecutionListener(listener: JobExecutionListener): Boolean {
+ return jobListeners.remove(listener)
+ }
+
+ override fun registerStepExecutionListener(listener: StepExecutionListener): Boolean {
+ return stepListeners.add(listener)
+ }
+
+ override fun unregisterStepExecutionListener(listener: StepExecutionListener): Boolean {
+ return stepListeners.remove(listener)
+ }
+
+ override fun registerStepInterceptor(interceptor: StepInterceptor): Boolean {
+ return stepInterceptors.add(interceptor)
+ }
+
+ override fun unregisterStepInterceptor(interceptor: StepInterceptor): Boolean {
+ return stepInterceptors.remove(interceptor)
+ }
+
+ override val size
+ get() = jobs.size
+
+ override fun contains(element: JobExecution): Boolean {
+ return jobs.containsValue(element)
+ }
+
+ override fun containsAll(elements: Collection): Boolean {
+ return elements.all { it in this }
+ }
+
+ override fun isEmpty(): Boolean {
+ return jobs.isEmpty()
+ }
+
+ override fun iterator(): Iterator {
+ return jobs.values.iterator()
+ }
+
+ @Synchronized
+ override fun launch(job: Job, executionContext: ExecutionContext?): JobExecution {
+ var jobExecution = jobs[job.id]
+
+ if (jobExecution != null) {
+ if (!jobExecution.isDone) {
+ return jobExecution
+ }
+ }
+
+ val interceptors = ArrayList(stepInterceptors.size + 1)
+ interceptors.addAll(stepInterceptors)
+ interceptors.add(this)
+
+ jobExecution = JobExecution(job, executionContext ?: ExecutionContext(), this, interceptors)
+
+ jobs[job.id] = jobExecution
+
+ executor.execute {
+ jobExecution.status = JobStatus.STARTED
+
+ job.beforeJob(jobExecution)
+ jobListeners.forEach { it.beforeJob(jobExecution) }
+
+ val stepJobListeners = LinkedHashSet()
+
+ try {
+ while (jobExecution.canContinue && job.hasNext(jobExecution)) {
+ val step = job.next(jobExecution)
+
+ if (step is JobExecutionListener) {
+ if (stepJobListeners.add(step)) {
+ step.beforeJob(jobExecution)
+ }
+ }
+
+ val result = stepHandler.handle(step, StepExecution(step, jobExecution))
+ result.get()
+ }
+
+ jobExecution.status = if (jobExecution.isStopping) JobStatus.STOPPED else JobStatus.COMPLETED
+ jobExecution.complete()
+ } catch (e: Throwable) {
+ LOG.error("job failed. job=$job, jobExecution=$jobExecution", e)
+ jobExecution.status = JobStatus.FAILED
+ jobExecution.completeExceptionally(e)
+ } finally {
+ jobExecution.finishedAt = LocalDateTime.now()
+ }
+
+ job.afterJob(jobExecution)
+ jobListeners.forEach { it.afterJob(jobExecution) }
+ stepJobListeners.forEach { it.afterJob(jobExecution) }
+ }
+
+ return jobExecution
+ }
+
+ override fun stop(mayInterruptIfRunning: Boolean) {
+ jobs.forEach { stop(it.value, mayInterruptIfRunning) }
+ }
+
+ override fun stop(jobExecution: JobExecution, mayInterruptIfRunning: Boolean) {
+ if (!jobExecution.isDone && !jobExecution.isStopping) {
+ jobExecution.status = JobStatus.STOPPING
+ jobExecution.job.stop(mayInterruptIfRunning)
+ }
+ }
+
+ override fun intercept(chain: StepChain): StepResult {
+ stepListeners.forEach { it.beforeStep(chain.stepExecution) }
+ val result = chain.step.execute(chain.stepExecution)
+ stepListeners.forEach { it.afterStep(chain.stepExecution) }
+ return result
+ }
+
+ override fun toString() = "AsyncJobLauncher"
+
+ companion object {
+
+ @JvmStatic private val LOG = loggerFor()
+ }
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/DefaultStepHandler.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/DefaultStepHandler.kt
new file mode 100644
index 000000000..248aea50f
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/DefaultStepHandler.kt
@@ -0,0 +1,39 @@
+package nebulosa.batch.processing
+
+import nebulosa.log.loggerFor
+
+object DefaultStepHandler : StepHandler {
+
+ @JvmStatic private val LOG = loggerFor()
+
+ override fun handle(step: Step, stepExecution: StepExecution): StepResult {
+ val jobLauncher = stepExecution.jobExecution.jobLauncher
+
+ when (step) {
+ is SplitStep -> {
+ step.beforeStep(stepExecution)
+ step.parallelStream().forEach { jobLauncher.stepHandler.handle(it, stepExecution) }
+ step.afterStep(stepExecution)
+ }
+ is FlowStep -> {
+ step.beforeStep(stepExecution)
+ step.forEach { jobLauncher.stepHandler.handle(it, stepExecution) }
+ step.afterStep(stepExecution)
+ }
+ else -> {
+ val chain = StepInterceptorChain(stepExecution.jobExecution.stepInterceptors, step, stepExecution)
+
+ LOG.info("step started. step={}, context={}", step, stepExecution.context)
+
+ while (stepExecution.jobExecution.canContinue) {
+ val status = chain.proceed().get()
+ if (status != RepeatStatus.CONTINUABLE) break
+ }
+
+ LOG.info("step finished. step={}, context={}", step, stepExecution.context)
+ }
+ }
+
+ return StepResult.FINISHED
+ }
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/ExecutionContext.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/ExecutionContext.kt
new file mode 100644
index 000000000..0525cbad9
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/ExecutionContext.kt
@@ -0,0 +1,10 @@
+package nebulosa.batch.processing
+
+import java.util.concurrent.ConcurrentHashMap
+
+open class ExecutionContext : ConcurrentHashMap {
+
+ constructor(initialCapacity: Int = 64) : super(initialCapacity)
+
+ constructor(context: ExecutionContext) : super(context)
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/FlowStep.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/FlowStep.kt
new file mode 100644
index 000000000..ae3f1a49f
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/FlowStep.kt
@@ -0,0 +1,12 @@
+package nebulosa.batch.processing
+
+interface FlowStep : Step, StepExecutionListener, Collection {
+
+ override fun execute(stepExecution: StepExecution): StepResult {
+ return stepExecution.jobExecution.jobLauncher.stepHandler.handle(this, stepExecution)
+ }
+
+ override fun stop(mayInterruptIfRunning: Boolean) {
+ forEach { it.stop(mayInterruptIfRunning) }
+ }
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/Job.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/Job.kt
new file mode 100644
index 000000000..11dcf486a
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/Job.kt
@@ -0,0 +1,14 @@
+package nebulosa.batch.processing
+
+interface Job : JobExecutionListener, Stoppable {
+
+ val id: String
+
+ fun hasNext(jobExecution: JobExecution): Boolean
+
+ fun next(jobExecution: JobExecution): Step
+
+ override fun beforeJob(jobExecution: JobExecution) = Unit
+
+ override fun afterJob(jobExecution: JobExecution) = Unit
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/JobExecution.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/JobExecution.kt
new file mode 100644
index 000000000..7dfed6d38
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/JobExecution.kt
@@ -0,0 +1,58 @@
+package nebulosa.batch.processing
+
+import java.time.LocalDateTime
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.ExecutionException
+import java.util.concurrent.TimeUnit
+
+data class JobExecution(
+ val job: Job,
+ val context: ExecutionContext,
+ val jobLauncher: JobLauncher,
+ val stepInterceptors: List,
+ val startedAt: LocalDateTime = LocalDateTime.now(),
+ var status: JobStatus = JobStatus.STARTING,
+ var finishedAt: LocalDateTime? = null,
+) {
+
+ @JvmField internal val completable = CompletableFuture()
+
+ inline val jobId
+ get() = job.id
+
+ inline val canContinue
+ get() = status == JobStatus.STARTED
+
+ inline val isStopping
+ get() = status == JobStatus.STOPPING
+
+ inline val isStopped
+ get() = status == JobStatus.STOPPED
+
+ inline val isCompleted
+ get() = status == JobStatus.COMPLETED
+
+ inline val isFailed
+ get() = status == JobStatus.FAILED
+
+ val isDone
+ get() = isCompleted || isFailed || isStopped
+
+ fun waitForCompletion(timeout: Long = 0L, unit: TimeUnit = TimeUnit.MILLISECONDS): Boolean {
+ try {
+ if (timeout <= 0L) completable.get()
+ else return completable.get(timeout, unit)
+ return true
+ } catch (e: ExecutionException) {
+ throw e.cause ?: e
+ }
+ }
+
+ internal fun complete() {
+ completable.complete(true)
+ }
+
+ internal fun completeExceptionally(e: Throwable) {
+ completable.completeExceptionally(e)
+ }
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/JobExecutionListener.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/JobExecutionListener.kt
new file mode 100644
index 000000000..70508ddc9
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/JobExecutionListener.kt
@@ -0,0 +1,8 @@
+package nebulosa.batch.processing
+
+interface JobExecutionListener {
+
+ fun beforeJob(jobExecution: JobExecution) = Unit
+
+ fun afterJob(jobExecution: JobExecution) = Unit
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/JobLauncher.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/JobLauncher.kt
new file mode 100644
index 000000000..fa00440a7
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/JobLauncher.kt
@@ -0,0 +1,22 @@
+package nebulosa.batch.processing
+
+interface JobLauncher : Collection, Stoppable {
+
+ val stepHandler: StepHandler
+
+ fun registerJobExecutionListener(listener: JobExecutionListener): Boolean
+
+ fun unregisterJobExecutionListener(listener: JobExecutionListener): Boolean
+
+ fun registerStepExecutionListener(listener: StepExecutionListener): Boolean
+
+ fun unregisterStepExecutionListener(listener: StepExecutionListener): Boolean
+
+ fun registerStepInterceptor(interceptor: StepInterceptor): Boolean
+
+ fun unregisterStepInterceptor(interceptor: StepInterceptor): Boolean
+
+ fun launch(job: Job, executionContext: ExecutionContext? = null): JobExecution
+
+ fun stop(jobExecution: JobExecution, mayInterruptIfRunning: Boolean = true)
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/JobStatus.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/JobStatus.kt
new file mode 100644
index 000000000..005aa08e2
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/JobStatus.kt
@@ -0,0 +1,11 @@
+package nebulosa.batch.processing
+
+enum class JobStatus {
+ STARTING,
+ STARTED,
+ STOPPING,
+ STOPPED,
+ FAILED,
+ COMPLETED,
+ ABANDONED,
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/PublishSubscribe.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/PublishSubscribe.kt
new file mode 100644
index 000000000..a2a26aa00
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/PublishSubscribe.kt
@@ -0,0 +1,44 @@
+package nebulosa.batch.processing
+
+import io.reactivex.rxjava3.core.Observable
+import io.reactivex.rxjava3.core.ObservableSource
+import io.reactivex.rxjava3.core.Observer
+import io.reactivex.rxjava3.disposables.Disposable
+import io.reactivex.rxjava3.functions.Consumer
+import io.reactivex.rxjava3.subjects.Subject
+import java.io.Closeable
+
+interface PublishSubscribe : ObservableSource, Observer, Closeable {
+
+ val subject: Subject
+
+ fun Observable.transform() = this
+
+ fun subscribe(onNext: Consumer): Disposable {
+ return subject.transform().subscribe(onNext)
+ }
+
+ override fun subscribe(observer: Observer) {
+ return subject.transform().subscribe(observer)
+ }
+
+ override fun onSubscribe(disposable: Disposable) {
+ subject.onSubscribe(disposable)
+ }
+
+ override fun onNext(event: T) {
+ subject.onNext(event)
+ }
+
+ override fun onError(e: Throwable) {
+ subject.onError(e)
+ }
+
+ override fun onComplete() {
+ subject.onComplete()
+ }
+
+ override fun close() {
+ onComplete()
+ }
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/RepeatStatus.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/RepeatStatus.kt
new file mode 100644
index 000000000..aa45a61ba
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/RepeatStatus.kt
@@ -0,0 +1,6 @@
+package nebulosa.batch.processing
+
+enum class RepeatStatus {
+ CONTINUABLE,
+ FINISHED,
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/SimpleFlowStep.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/SimpleFlowStep.kt
new file mode 100644
index 000000000..7f45b613f
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/SimpleFlowStep.kt
@@ -0,0 +1,10 @@
+package nebulosa.batch.processing
+
+open class SimpleFlowStep : FlowStep, ArrayList {
+
+ constructor(initialCapacity: Int = 4) : super(initialCapacity)
+
+ constructor(steps: Collection) : super(steps)
+
+ constructor(vararg steps: Step) : this(steps.toList())
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/SimpleJob.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/SimpleJob.kt
new file mode 100644
index 000000000..9b062638a
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/SimpleJob.kt
@@ -0,0 +1,36 @@
+package nebulosa.batch.processing
+
+abstract class SimpleJob : Job, ArrayList {
+
+ constructor(initialCapacity: Int = 4) : super(initialCapacity)
+
+ constructor(steps: Collection) : super(steps)
+
+ constructor(vararg steps: Step) : this(steps.toList())
+
+ @Volatile private var position = 0
+ @Volatile private var stopped = false
+
+ override fun hasNext(jobExecution: JobExecution): Boolean {
+ return !stopped && position < size
+ }
+
+ override fun next(jobExecution: JobExecution): Step {
+ return this[position++]
+ }
+
+ override fun stop(mayInterruptIfRunning: Boolean) {
+ if (stopped) return
+
+ stopped = true
+
+ if (position in 1..size) {
+ this[position - 1].stop(mayInterruptIfRunning)
+ }
+ }
+
+ fun reset() {
+ stopped = false
+ position = 0
+ }
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/SimpleSplitStep.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/SimpleSplitStep.kt
new file mode 100644
index 000000000..ba17430ae
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/SimpleSplitStep.kt
@@ -0,0 +1,10 @@
+package nebulosa.batch.processing
+
+open class SimpleSplitStep : SimpleFlowStep, SplitStep {
+
+ constructor(initialCapacity: Int = 4) : super(initialCapacity)
+
+ constructor(steps: Collection) : super(steps)
+
+ constructor(vararg steps: Step) : this(steps.toList())
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/SplitStep.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/SplitStep.kt
new file mode 100644
index 000000000..a91bc8fa1
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/SplitStep.kt
@@ -0,0 +1,3 @@
+package nebulosa.batch.processing
+
+interface SplitStep : FlowStep
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/Step.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/Step.kt
new file mode 100644
index 000000000..e9a5d7ce3
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/Step.kt
@@ -0,0 +1,8 @@
+package nebulosa.batch.processing
+
+interface Step : Stoppable {
+
+ fun execute(stepExecution: StepExecution): StepResult
+
+ override fun stop(mayInterruptIfRunning: Boolean) = Unit
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/StepChain.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/StepChain.kt
new file mode 100644
index 000000000..4e4d97cb4
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/StepChain.kt
@@ -0,0 +1,10 @@
+package nebulosa.batch.processing
+
+interface StepChain {
+
+ val step: Step
+
+ val stepExecution: StepExecution
+
+ fun proceed(): StepResult
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/StepExecution.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/StepExecution.kt
new file mode 100644
index 000000000..56538f1b7
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/StepExecution.kt
@@ -0,0 +1,10 @@
+package nebulosa.batch.processing
+
+data class StepExecution(
+ val step: Step,
+ val jobExecution: JobExecution,
+) {
+
+ inline val context
+ get() = jobExecution.context
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/StepExecutionListener.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/StepExecutionListener.kt
new file mode 100644
index 000000000..009528698
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/StepExecutionListener.kt
@@ -0,0 +1,8 @@
+package nebulosa.batch.processing
+
+interface StepExecutionListener {
+
+ fun beforeStep(stepExecution: StepExecution) = Unit
+
+ fun afterStep(stepExecution: StepExecution) = Unit
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/StepHandler.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/StepHandler.kt
new file mode 100644
index 000000000..0e6ad8bd0
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/StepHandler.kt
@@ -0,0 +1,6 @@
+package nebulosa.batch.processing
+
+interface StepHandler {
+
+ fun handle(step: Step, stepExecution: StepExecution): StepResult
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/StepInterceptor.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/StepInterceptor.kt
new file mode 100644
index 000000000..80759da03
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/StepInterceptor.kt
@@ -0,0 +1,6 @@
+package nebulosa.batch.processing
+
+interface StepInterceptor {
+
+ fun intercept(chain: StepChain): StepResult
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/StepInterceptorChain.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/StepInterceptorChain.kt
new file mode 100644
index 000000000..d1ed9a905
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/StepInterceptorChain.kt
@@ -0,0 +1,15 @@
+package nebulosa.batch.processing
+
+data class StepInterceptorChain(
+ private val interceptors: List,
+ override val step: Step,
+ override val stepExecution: StepExecution,
+ private val index: Int = 0,
+) : StepChain {
+
+ override fun proceed(): StepResult {
+ val next = StepInterceptorChain(interceptors, step, stepExecution, index + 1)
+ val interceptor = interceptors[index]
+ return interceptor.intercept(next)
+ }
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/StepResult.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/StepResult.kt
new file mode 100644
index 000000000..e3d044cd8
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/StepResult.kt
@@ -0,0 +1,23 @@
+package nebulosa.batch.processing
+
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.Future
+
+data class StepResult(@JvmField internal val completable: CompletableFuture) : Future by completable {
+
+ constructor() : this(CompletableFuture())
+
+ fun complete(status: RepeatStatus): Boolean {
+ return completable.complete(status)
+ }
+
+ fun completeExceptionally(e: Throwable): Boolean {
+ return completable.completeExceptionally(e)
+ }
+
+ companion object {
+
+ @JvmStatic val CONTINUABLE = StepResult(CompletableFuture.completedFuture(RepeatStatus.CONTINUABLE))
+ @JvmStatic val FINISHED = StepResult(CompletableFuture.completedFuture(RepeatStatus.FINISHED))
+ }
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/Stoppable.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/Stoppable.kt
new file mode 100644
index 000000000..c7c7f48a2
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/Stoppable.kt
@@ -0,0 +1,6 @@
+package nebulosa.batch.processing
+
+interface Stoppable {
+
+ fun stop(mayInterruptIfRunning: Boolean = true)
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/delay/DelayStep.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/delay/DelayStep.kt
new file mode 100644
index 000000000..894f28ad7
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/delay/DelayStep.kt
@@ -0,0 +1,65 @@
+package nebulosa.batch.processing.delay
+
+import nebulosa.batch.processing.*
+import java.time.Duration
+
+data class DelayStep(@JvmField val duration: Duration) : Step, JobExecutionListener {
+
+ private val listeners = LinkedHashSet()
+
+ @Volatile private var aborted = false
+
+ fun registerDelayStepListener(listener: DelayStepListener) {
+ listeners.add(listener)
+ }
+
+ fun unregisterDelayStepListener(listener: DelayStepListener) {
+ listeners.remove(listener)
+ }
+
+ override fun execute(stepExecution: StepExecution): StepResult {
+ var remainingTime = duration
+
+ if (!aborted && remainingTime > Duration.ZERO) {
+ while (!aborted && remainingTime > Duration.ZERO) {
+ val waitTime = minOf(remainingTime, DELAY_INTERVAL)
+
+ if (waitTime > Duration.ZERO) {
+ stepExecution.context[REMAINING_TIME] = remainingTime
+ stepExecution.context[WAIT_TIME] = waitTime
+
+ val progress = (duration.toNanos() - remainingTime.toNanos()) / duration.toNanos().toDouble()
+ stepExecution.context[PROGRESS] = progress
+
+ listeners.forEach { it.onDelayElapsed(this, stepExecution) }
+ Thread.sleep(waitTime.toMillis())
+ remainingTime -= waitTime
+ }
+ }
+
+ stepExecution.context[REMAINING_TIME] = Duration.ZERO
+ stepExecution.context[WAIT_TIME] = Duration.ZERO
+
+ listeners.forEach { it.onDelayElapsed(this, stepExecution) }
+ }
+
+ return StepResult.FINISHED
+ }
+
+ override fun stop(mayInterruptIfRunning: Boolean) {
+ aborted = true
+ }
+
+ override fun afterJob(jobExecution: JobExecution) {
+ listeners.clear()
+ }
+
+ companion object {
+
+ const val REMAINING_TIME = "DELAY.REMAINING_TIME"
+ const val WAIT_TIME = "DELAY.WAIT_TIME"
+ const val PROGRESS = "DELAY.PROGRESS"
+
+ @JvmField val DELAY_INTERVAL = Duration.ofMillis(500)!!
+ }
+}
diff --git a/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/delay/DelayStepListener.kt b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/delay/DelayStepListener.kt
new file mode 100644
index 000000000..04847c947
--- /dev/null
+++ b/nebulosa-batch-processing/src/main/kotlin/nebulosa/batch/processing/delay/DelayStepListener.kt
@@ -0,0 +1,8 @@
+package nebulosa.batch.processing.delay
+
+import nebulosa.batch.processing.StepExecution
+
+fun interface DelayStepListener {
+
+ fun onDelayElapsed(step: DelayStep, stepExecution: StepExecution)
+}
diff --git a/nebulosa-batch-processing/src/test/kotlin/BatchProcessingTest.kt b/nebulosa-batch-processing/src/test/kotlin/BatchProcessingTest.kt
new file mode 100644
index 000000000..d9cb0d00b
--- /dev/null
+++ b/nebulosa-batch-processing/src/test/kotlin/BatchProcessingTest.kt
@@ -0,0 +1,146 @@
+import io.kotest.core.spec.style.StringSpec
+import io.kotest.matchers.booleans.shouldBeTrue
+import io.kotest.matchers.longs.shouldBeInRange
+import io.kotest.matchers.shouldBe
+import nebulosa.batch.processing.*
+import nebulosa.log.loggerFor
+import java.util.concurrent.Executors
+import kotlin.concurrent.thread
+
+class BatchProcessingTest : StringSpec() {
+
+ init {
+ val launcher = AsyncJobLauncher(Executors.newSingleThreadExecutor())
+
+ "single" {
+ val startedAt = System.currentTimeMillis()
+ val jobExecution = launcher.launch(MathJob(listOf(SumStep())))
+ jobExecution.waitForCompletion()
+ jobExecution.context["VALUE"] shouldBe 1.0
+ (System.currentTimeMillis() - startedAt) shouldBeInRange (1000L..2000L)
+ }
+ "multiple" {
+ val startedAt = System.currentTimeMillis()
+ val jobExecution = launcher.launch(MathJob(listOf(SumStep(), SumStep())))
+ jobExecution.waitForCompletion()
+ jobExecution.context["VALUE"] shouldBe 2.0
+ (System.currentTimeMillis() - startedAt) shouldBeInRange (2000L..3000L)
+ }
+ "split" {
+ val startedAt = System.currentTimeMillis()
+ val jobExecution = launcher.launch(MathJob(listOf(SplitSumStep())))
+ jobExecution.waitForCompletion()
+ jobExecution.context["VALUE"] shouldBe N.toDouble()
+ (System.currentTimeMillis() - startedAt) shouldBeInRange (1000L..2000L)
+ }
+ "flow" {
+ val startedAt = System.currentTimeMillis()
+ val jobExecution = launcher.launch(MathJob(listOf(FlowSumStep())))
+ jobExecution.waitForCompletion()
+ jobExecution.context["VALUE"] shouldBe N.toDouble()
+ (System.currentTimeMillis() - startedAt) shouldBeInRange (N * 1000L..(N + 1) * 1000L)
+ }
+ "split flow" {
+ val startedAt = System.currentTimeMillis()
+ val jobExecution = launcher.launch(MathJob(listOf(SimpleSplitStep(FlowSumStep(), FlowSumStep()))))
+ jobExecution.waitForCompletion()
+ jobExecution.context["VALUE"] shouldBe (N * 2).toDouble()
+ (System.currentTimeMillis() - startedAt) shouldBeInRange (N * 1000L..(N + 1) * 1000L)
+ }
+ "stop" {
+ val startedAt = System.currentTimeMillis()
+ val jobExecution = launcher.launch(MathJob((0..7).map { SumStep() }))
+ thread { Thread.sleep(4000); launcher.stop(jobExecution) }
+ jobExecution.waitForCompletion()
+ jobExecution.context["VALUE"] shouldBe 3.0
+ jobExecution.isStopped.shouldBeTrue()
+ (System.currentTimeMillis() - startedAt) shouldBeInRange (4000L..5000L)
+ }
+ "repeatable" {
+ val startedAt = System.currentTimeMillis()
+ val jobExecution = launcher.launch(MathJob(listOf(SumStep()), 10.0))
+ jobExecution.waitForCompletion()
+ jobExecution.context["VALUE"] shouldBe 20.0
+ (System.currentTimeMillis() - startedAt) shouldBeInRange (10000L..11000L)
+ }
+ }
+
+ private class MathJob(
+ steps: List,
+ private val initialValue: Double = 0.0,
+ ) : SimpleJob(steps) {
+
+ override val id = "Job.Math"
+
+ override fun beforeJob(jobExecution: JobExecution) {
+ jobExecution.context["VALUE"] = initialValue
+ }
+ }
+
+ private abstract class MathStep : Step {
+
+ @Volatile private var running = false
+
+ protected abstract fun compute(value: Double): Double
+
+ final override fun execute(stepExecution: StepExecution): StepResult {
+ var sleepCount = 0
+
+ val jobExecution = stepExecution.jobExecution
+ running = jobExecution.canContinue
+
+ while (running && sleepCount++ < 100) {
+ Thread.sleep(10)
+ }
+
+ if (running) {
+ synchronized(jobExecution) {
+ val value = jobExecution.context["VALUE"]!! as Double
+ LOG.info("executing ${javaClass.simpleName}: $value")
+ jobExecution.context["VALUE"] = compute(value)
+
+ if (value >= 10.0 && value < 19.0) {
+ return StepResult.CONTINUABLE
+ }
+ }
+ }
+
+ return StepResult.FINISHED
+ }
+
+ override fun stop(mayInterruptIfRunning: Boolean) {
+ running = false
+ }
+ }
+
+ private class SumStep : MathStep() {
+
+ override fun compute(value: Double): Double {
+ return value + 1.0
+ }
+ }
+
+ private class SplitSumStep : SimpleSplitStep() {
+
+ init {
+ repeat(N) {
+ add(SumStep())
+ }
+ }
+ }
+
+ private class FlowSumStep : SimpleFlowStep() {
+
+ init {
+ repeat(N) {
+ add(SumStep())
+ }
+ }
+ }
+
+ companion object {
+
+ @JvmStatic private val LOG = loggerFor()
+ @JvmStatic private val N = Runtime.getRuntime().availableProcessors()
+ }
+}
diff --git a/nebulosa-common/src/main/kotlin/nebulosa/common/concurrency/CancellationToken.kt b/nebulosa-common/src/main/kotlin/nebulosa/common/concurrency/CancellationToken.kt
new file mode 100644
index 000000000..00d954d68
--- /dev/null
+++ b/nebulosa-common/src/main/kotlin/nebulosa/common/concurrency/CancellationToken.kt
@@ -0,0 +1,60 @@
+package nebulosa.common.concurrency
+
+import java.io.Closeable
+import java.util.concurrent.Future
+import java.util.concurrent.TimeUnit
+
+class CancellationToken : Closeable, Future {
+
+ private val latch = CountUpDownLatch(1)
+ private val listeners = LinkedHashSet()
+
+ fun listen(action: Runnable): Boolean {
+ return if (isDone) {
+ action.run()
+ false
+ } else {
+ listeners.add(action)
+ }
+ }
+
+ fun cancel() {
+ cancel(true)
+ }
+
+ @Synchronized
+ override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
+ if (latch.count <= 0) return false
+ latch.reset()
+ listeners.forEach(Runnable::run)
+ listeners.clear()
+ return true
+ }
+
+ override fun isCancelled(): Boolean {
+ return latch.get()
+ }
+
+ override fun isDone(): Boolean {
+ return latch.get()
+ }
+
+ override fun get(): Boolean {
+ latch.await()
+ return true
+ }
+
+ override fun get(timeout: Long, unit: TimeUnit): Boolean {
+ return latch.await(timeout, unit)
+ }
+
+ fun reset() {
+ latch.countUp(1 - latch.count)
+ listeners.clear()
+ }
+
+ override fun close() {
+ latch.reset()
+ listeners.clear()
+ }
+}
diff --git a/nebulosa-common/src/main/kotlin/nebulosa/common/concurrency/CountUpDownLatch.kt b/nebulosa-common/src/main/kotlin/nebulosa/common/concurrency/CountUpDownLatch.kt
index 884b96376..3b373dddf 100644
--- a/nebulosa-common/src/main/kotlin/nebulosa/common/concurrency/CountUpDownLatch.kt
+++ b/nebulosa-common/src/main/kotlin/nebulosa/common/concurrency/CountUpDownLatch.kt
@@ -6,15 +6,13 @@ import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.AbstractQueuedSynchronizer
import kotlin.math.max
-class CountUpDownLatch(initialCount: Int = 0) : AtomicBoolean(true) {
+class CountUpDownLatch(initialCount: Int = 0) : AtomicBoolean(initialCount == 0) {
private val sync = Sync(this)
init {
- if (initialCount > 0) {
- sync.count = initialCount
- set(false)
- }
+ require(initialCount >= 0) { "initialCount < 0: $initialCount" }
+ sync.count = initialCount
}
val count
diff --git a/nebulosa-guiding-phd2/build.gradle.kts b/nebulosa-guiding-phd2/build.gradle.kts
index 38ee1e398..7c9baa997 100644
--- a/nebulosa-guiding-phd2/build.gradle.kts
+++ b/nebulosa-guiding-phd2/build.gradle.kts
@@ -5,7 +5,6 @@ plugins {
dependencies {
api(project(":nebulosa-io"))
- api(project(":nebulosa-common"))
api(project(":nebulosa-guiding"))
api(project(":nebulosa-phd2-client"))
implementation(project(":nebulosa-log"))
diff --git a/nebulosa-guiding-phd2/src/main/kotlin/nebulosa/guiding/phd2/PHD2Guider.kt b/nebulosa-guiding-phd2/src/main/kotlin/nebulosa/guiding/phd2/PHD2Guider.kt
index 766c38366..daaa58bcc 100644
--- a/nebulosa-guiding-phd2/src/main/kotlin/nebulosa/guiding/phd2/PHD2Guider.kt
+++ b/nebulosa-guiding-phd2/src/main/kotlin/nebulosa/guiding/phd2/PHD2Guider.kt
@@ -1,5 +1,6 @@
package nebulosa.guiding.phd2
+import nebulosa.common.concurrency.CancellationToken
import nebulosa.common.concurrency.CountUpDownLatch
import nebulosa.guiding.*
import nebulosa.log.loggerFor
@@ -19,7 +20,7 @@ class PHD2Guider(private val client: PHD2Client) : Guider, PHD2EventListener {
@Volatile private var shiftRateAxis = ShiftAxesType.RADEC
@Volatile private var lockPosition = GuidePoint.ZERO
@Volatile private var starPosition = GuidePoint.ZERO
- private val listeners = hashSetOf()
+ private val listeners = LinkedHashSet()
override var pixelScale = 1.0
private set
@@ -232,14 +233,16 @@ class PHD2Guider(private val client: PHD2Client) : Guider, PHD2EventListener {
}
}
- override fun waitForSettle() {
+ override fun waitForSettle(cancellationToken: CancellationToken?) {
try {
+ cancellationToken?.listen(settling::reset)
settling.await(settleTimeout)
} catch (e: InterruptedException) {
LOG.warn("PHD2 did not send SettleDone message in expected time")
} catch (e: Throwable) {
LOG.warn("an error occurrs while waiting for settle done", e)
} finally {
+ cancellationToken?.close()
settling.reset()
}
}
diff --git a/nebulosa-guiding/build.gradle.kts b/nebulosa-guiding/build.gradle.kts
index 9ad896cec..f891151a0 100644
--- a/nebulosa-guiding/build.gradle.kts
+++ b/nebulosa-guiding/build.gradle.kts
@@ -5,6 +5,7 @@ plugins {
dependencies {
api(project(":nebulosa-math"))
+ api(project(":nebulosa-common"))
api(project(":nebulosa-indi-device"))
testImplementation(project(":nebulosa-test"))
}
diff --git a/nebulosa-guiding/src/main/kotlin/nebulosa/guiding/Guider.kt b/nebulosa-guiding/src/main/kotlin/nebulosa/guiding/Guider.kt
index 4e37d4626..178a039c3 100644
--- a/nebulosa-guiding/src/main/kotlin/nebulosa/guiding/Guider.kt
+++ b/nebulosa-guiding/src/main/kotlin/nebulosa/guiding/Guider.kt
@@ -1,5 +1,6 @@
package nebulosa.guiding
+import nebulosa.common.concurrency.CancellationToken
import java.io.Closeable
import java.time.Duration
@@ -35,7 +36,7 @@ interface Guider : Closeable {
fun dither(amount: Double, raOnly: Boolean = false)
- fun waitForSettle()
+ fun waitForSettle(cancellationToken: CancellationToken? = null)
companion object {
diff --git a/nebulosa-indi-client/src/main/kotlin/nebulosa/indi/client/device/camera/CameraDevice.kt b/nebulosa-indi-client/src/main/kotlin/nebulosa/indi/client/device/camera/CameraDevice.kt
index 5e87324cc..5d96adf67 100644
--- a/nebulosa-indi-client/src/main/kotlin/nebulosa/indi/client/device/camera/CameraDevice.kt
+++ b/nebulosa-indi-client/src/main/kotlin/nebulosa/indi/client/device/camera/CameraDevice.kt
@@ -339,14 +339,14 @@ internal open class CameraDevice(
override fun close() {
if (hasThermometer) {
- hasThermometer = false
handler.unregisterThermometer(this)
+ hasThermometer = false
LOG.info("thermometer detached: {}", name)
}
if (canPulseGuide) {
- canPulseGuide = false
handler.unregisterGuideOutput(this)
+ canPulseGuide = false
LOG.info("guide output detached: {}", name)
}
}
diff --git a/nebulosa-phd2-client/src/main/kotlin/nebulosa/phd2/client/PHD2Client.kt b/nebulosa-phd2-client/src/main/kotlin/nebulosa/phd2/client/PHD2Client.kt
index f192dfcac..9d3789923 100644
--- a/nebulosa-phd2-client/src/main/kotlin/nebulosa/phd2/client/PHD2Client.kt
+++ b/nebulosa-phd2-client/src/main/kotlin/nebulosa/phd2/client/PHD2Client.kt
@@ -23,7 +23,7 @@ import kotlin.math.max
class PHD2Client : NettyClient() {
- @JvmField internal val listeners = hashSetOf()
+ @JvmField internal val listeners = LinkedHashSet()
@JvmField internal val commands = hashMapOf>()
override val channelInitialzer = object : ChannelInitializer() {
diff --git a/nebulosa-stellarium-protocol/src/main/kotlin/nebulosa/stellarium/protocol/StellariumProtocolServer.kt b/nebulosa-stellarium-protocol/src/main/kotlin/nebulosa/stellarium/protocol/StellariumProtocolServer.kt
index 046a8cded..b815be467 100644
--- a/nebulosa-stellarium-protocol/src/main/kotlin/nebulosa/stellarium/protocol/StellariumProtocolServer.kt
+++ b/nebulosa-stellarium-protocol/src/main/kotlin/nebulosa/stellarium/protocol/StellariumProtocolServer.kt
@@ -71,7 +71,7 @@ class StellariumProtocolServer(
) : NettyServer(), CurrentPositionHandler {
private val stellariumMountHandler = AtomicReference()
- private val currentPositionHandlers = hashSetOf()
+ private val currentPositionHandlers = LinkedHashSet()
val rightAscension: Angle?
get() = stellariumMountHandler.get()?.rightAscension
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 925045edd..bc2c3728e 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -53,6 +53,7 @@ include(":nebulosa-alpaca-discovery-protocol")
include(":nebulosa-astap")
include(":nebulosa-astrometrynet")
include(":nebulosa-astrometrynet-jna")
+include(":nebulosa-batch-processing")
include(":nebulosa-common")
include(":nebulosa-constants")
include(":nebulosa-erfa")