Skip to content

Commit

Permalink
Add status method to Agent
Browse files Browse the repository at this point in the history
This allows users to query whether the agent is applying state
(status `running`), listening for sensor changes (status `idle`)
is `stopping` or it has been `stopped`.

Change-type: minor
  • Loading branch information
pipex committed Nov 10, 2023
1 parent 943ce32 commit e29313e
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 12 deletions.
36 changes: 35 additions & 1 deletion lib/agent.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,41 @@ describe('Agent', () => {
agent.stop();
});

it('allows to use observables as actions', async () => {
it('it allows to query the agent state', async () => {
const inc = Task.from<number>({
condition: (state, { target }) => state < target,
effect: (state) => ++state._,
description: 'increment',
});
const agent = Agent.from({
initial: 0,
opts: { logger: logger, minWaitMs: 10 },
tasks: [inc],
});

expect(agent.status()).to.equal('stopped');
// Subscribe to the count
const count: number[] = [];
agent.subscribe((s) => count.push(s));

agent.seek(10);
await setTimeout(1);
expect(agent.status()).to.equal('running');

await expect(agent.wait()).to.eventually.deep.equal({
success: true,
state: 10,
});

expect(agent.status()).to.equal('idle');

// Intermediate states returned by the observable should be emitted by the agent
expect(count).to.deep.equal([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
agent.stop();
expect(agent.status()).to.equal('stopping');
});

it('allows to observe changes by an action', async () => {
const counter = Task.from<number>({
condition: (state, { target }) => state < target,
effect: (state, { target }) => {
Expand Down
29 changes: 23 additions & 6 deletions lib/agent/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import assert from '../assert';
import { NullLogger } from '../logger';
import { Subscribable, Subject } from '../observable';
import { Subscribable, Subject, Observable } from '../observable';
import { Planner } from '../planner';
import { Sensor } from '../sensor';
import { Target } from '../target';
import { Task } from '../task';
import { Runtime } from './runtime';
import { AgentOpts, NotStarted, Result } from './types';
import { Runtime, RuntimeState } from './runtime';
import { AgentOpts, AgentStatus, NotStarted, Result } from './types';

export * from './types';

Expand Down Expand Up @@ -126,6 +126,11 @@ export interface Agent<TState = any> extends Subscribable<TState> {
* immediately.
*/
stop(): void;

/**
* Get the agent status
*/
status(): AgentStatus;
}

type DeepPartial<T> = T extends any[] | ((...args: any[]) => any)
Expand Down Expand Up @@ -200,14 +205,19 @@ function from<TState>({
assert(opts.maxWaitMs > 0, 'opts.maxWaitMs must be greater than 0');
assert(opts.minWaitMs > 0, 'opts.minWaitMs must be greater than 0');

const subject: Subject<TState> = new Subject();
let status: AgentStatus = 'stopped';

// Subscribe to runtime changes to keep
// the local copy of state up-to-date
subject.subscribe((s) => {
const subject: Subject<RuntimeState<TState>> = new Subject();
subject.subscribe(({ state: s, status: rStatus }) => {
state = s;
status = rStatus;
});

// This is used to communicate state to the agent subscribers
const observable = Observable.from(subject).map(({ state: s }) => s);

let setupRuntime: Promise<Runtime<TState> | null> = Promise.resolve(null);

return {
Expand Down Expand Up @@ -241,10 +251,14 @@ function from<TState>({
// Reset the runtime
setupRuntime = Promise.resolve(null);

status = 'stopping';
return runtime.stop().then(() => {
// We notify subscribers of completion only
// when stop is called
subject.complete();

// Set the agent status back to stopped
status = 'stopped';
});
});
},
Expand All @@ -261,7 +275,10 @@ function from<TState>({
return state;
},
subscribe(next) {
return subject.subscribe(next);
return observable.subscribe(next);
},
status() {
return status;
},
};
}
Expand Down
30 changes: 25 additions & 5 deletions lib/agent/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { observe } from './observe';

import {
AgentOpts,
AgentStatus,
Failure,
NotStarted,
Result,
Expand Down Expand Up @@ -51,6 +52,11 @@ class PlanNotFound extends Error {
}
}

export interface RuntimeState<TState> {
status: AgentStatus;
state: TState;
}

export class Runtime<TState> {
private promise: Promise<Result<TState>> = Promise.resolve({
success: false,
Expand All @@ -62,8 +68,16 @@ export class Runtime<TState> {
private subscribed: Subscription[] = [];
private stateRef: Ref<TState>;

get status(): AgentStatus {
if (this.running) {
return 'running';
}

return 'idle';
}

constructor(
private readonly observer: Observer<TState>,
private readonly observer: Observer<RuntimeState<TState>>,
state: TState,
private readonly target: Target<TState>,
private readonly planner: Planner<TState>,
Expand All @@ -83,14 +97,14 @@ export class Runtime<TState> {
this.start();
} else {
// Notify the observer of the new state
this.observer.next(s);
this.observer.next({ status: this.status, state: s });
}
}),
);
}

public get state() {
return this.stateRef._;
return structuredClone(this.stateRef._);
}

private findPlan() {
Expand Down Expand Up @@ -138,7 +152,13 @@ export class Runtime<TState> {
// local state without the need of comparisons later.
// The observe() wrapper allows to notify the observer from every
// change to some part of the state
await observe(action, this.observer)(this.stateRef);
await observe(action, {
next: (s: TState) => {
this.observer.next({ status: this.status, state: s });
},
error: () => void 0,
complete: () => void 0,
})(this.stateRef);
} catch (e) {
throw new ActionRunFailed(action, e);
}
Expand Down Expand Up @@ -222,7 +242,7 @@ export class Runtime<TState> {
let found = false;

// Send the initial state to the observer
this.observer.next(structuredClone(this.stateRef._));
this.observer.next({ status: this.status, state: this.state });
logger.info('applying new target state');
while (!this.stopped) {
try {
Expand Down
9 changes: 9 additions & 0 deletions lib/agent/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@ export type Result<T> =
| { success: true; state: T }
| { success: false; error: Error };

/**
* The agent runtime status
*
* - idle: The agent is not currently seeking a target state but may be listening for changes
* - running: The agent is actively seeking a target state
* - stopped: The agent has been stopped and is no longer listening for changes
*/
export type AgentStatus = 'idle' | 'running' | 'stopping' | 'stopped';

export interface AgentOpts {
/**
* Follow the system state and keep re-planning if the state goes off-target
Expand Down

0 comments on commit e29313e

Please sign in to comment.