-
Notifications
You must be signed in to change notification settings - Fork 0
Kno Threads
Kno is designed to support robust performant multi-threaded libraries, services, and applications. This support starts with the fine-grained locking and lockless GC of the Kno runtime. It extends into Kno's server/servlet architecture, where asynchronous I/O keeps worker/content threads from blocking on network I/O. It is realized in the Kno language through both high-level constructs for using multiple threads and CPUs and lower level functions for implement new high-level constructs.
The simplest high level construct for using multiple threads is
PARALLEL
which evaluates its arguments in separate threads, waits
for them to finish, and then returns all of the results as a choice.
For example:
#|kno>|# (parallel (urlcontent "https://example.org/status/alpha")
(urlcontent "https://example.org/status/beta")
(urlcontent "https://example.org/status/gamma"))
;; Example
{"alpha:done" "beta:waiting" "gamma:running"}
```
This starts three threads fetching three URLs. When all of them finish, the
results of each URL are returned as a choice.
Another high level construct is the `SPAWN` form, which evaluates an
expression in a separate thread and returns the thread object(s). If
the argument is a choice, a separate thread evaluates each alternative.
````console
#|kno>|# (spawn {(urlcontent "https://example.org/status/alpha")
(urlcontent "https://example.org/status/beta")
(urlcontent "https://example.org/status/gamma")})
;; Example
SPAWN
takes an optional second argument of threadopts which are
explained below.
THREAD/EVAL
is like EVAL
but returns a new thread evaluating its
first argument. It can also take an explicit environment argument
(optional) and an optional threadopts argument. So, we can use
CHOICE evaluation to generate multiple expressions to evaluate:
#|kno>|# (thread/eval (list 'urlcontent
(glom "https://example.org/status/"
{"alpha" "beta" "gamma"})))
THREAD/RESULT
returns the value of a thread's computation when it's
finished or #f
otherwise, so we might have:
#|kno>|# (thread/result
(spawn {(urlcontent "https://example.org/status/alpha")
(urlcontent "https://example.org/status/beta")
(urlcontent "https://example.org/status/gamma")}))
{"alpha:done" #f "gamma:running"}
#|kno>|# (thread/result
(spawn {(urlcontent "https://example.org/status/alpha")
(urlcontent "https://example.org/status/beta")
(urlcontent "https://example.org/status/gamma")}))
{"alpha:done" "beta:waiting" "gamma:running"}
THREAD/FINISH
is just like THREAD/RESULT
but waits for the thread
to finish fist.
(thread? *object*)
returns #t if object is a thread.
(thread-id *thread*)
returns the operating systems numeric
identifier for thread. This value is dependent on the underlying
operating system. The returned id can be used by FIND-THREAD
to
locate the thread later.
(find-thread *id* [*err*])
returns a thread given the operating
systems numeric identifier, if the thread was launched by KNO. If
err is true, this signal an error if it fails; otherwise it returns
false.
(thread/exited? *thread*)
returns #t if the computation in thread
has has exited, either normally or with an error.
(thread/error? *thread*)
returns #t
if the thread aborted with an
error during its execution.
(thread/finished? *thread*)
returns #t
if thread has finished without an error.
(thread/result *threads*)
returns the result eventually returned by
threads or #f
if it is still running.
(thread/finish *threads*)
waits for thread to finish and returns its thread/result
.
(thread/join *threads* [*opts*])
takes a set of threads (a choice) and
waits for them to exit. opts can specify how long thread/join
will
wait for the threads to exit. In this case, it returns all of
threads which have exited. opts is either:
- a number of econds (integer or flonum)
- a timestamp
- an opts structure with a
timeout
slot of one of the above.
(thread/wait! *threads* [*opts*])
is just like thread/join
but
returns unfinished threads when a timeout is specified.
(thread/finish *threads* [*opts*])
is like thread/join
but but
returns the actual results (thread/result
) of each thread which has
exited. Note that if a thread exited with an error, the "result" is an
exception object.
Implementing multi-threaded code often requires synchronization between different threads to access shared data structure, report overall progress, etc. In Kno, this is established through a variety of different types of sychronizers. There are currently four basic kinds of synchronizers:
- synchronized lambdas
- mutexes
- read/write locks
- condition variables
A description of thread-based programming is beyond the scope of this document, but types 2-4 are standard multi-threading constructs. Synchornized lambdas are common in some languages, such as with Java's synchronized keyword. Basically, the idea is that a synchronized lambda can only be active in one thread at a time. If it is called in a thread while it is already running another thread, it waits until it has finished in the other thread.
Consider, the following code:
(define counter 0)
(define bump-counter (lambda ((n 1)) (set! counter (+ counter n))))
#|kno>| (begin (dotimes (i 100) (bump-counter 5)) (dotimes (i 100) (bump-counter 2)) (dotimes (i 100) (bump-counter 3)))
#|kno>| counter
1000
The value of counter at the end is 1000, which we would expect because we added each of 5, 2, and 3 repeated 100 times. Now, suppose we want to run our loops in parallel, e.g.
(define counter 0)
(define bump-counter (lambda ((n 1)) (set! counter (+ counter n))))
#|kno>| (thread/finish (spawn {(dotimes (i 100) (bump-counter 5))
(dotimes (i 100) (bump-counter 2))
(dotimes (i 100) (bump-counter 3))}))
#|kno>| counter
975
The final value of counter
should be 1000, but the calls to
bump-counter have interfered with one another. Suppose thread #1 and
thread #2 both read the same value of counter
. They each increment
that value by one and store the result back into counter
. However,
counter
will only reflect one of the bump-counter
calls. (Note,
that there is a small chance, depending on the thread scheduler, that
we would get the right answer, but it's a small chance.
This can be fixed by making bump-counter
into a synchronized
function which can't interfere with itself.
(define counter 0)
(define bump-counter (slambda ((n 1)) (set! counter (+ counter n))))
#|kno>| (thread/finish (spawn {(dotimes (i 100) (bump-counter)) (dotimes (i 100) (bump-counter)) (dotimes (i 100) (bump-counter))}))
#|kno>| counter
1000
The steps of reading, incrementing, and storing the value of counter
are all handled by bump-counter
which can only be active in one
thread at a time.
Kno provides three generic locking expressions which work on all synchornizers:
-
(SYNCHRO/LOCK! *synchronizer*)
acquires a lock on synchronizer; if another thread currently has that lock,SYNCHRO/LOCK!
waits for it to be released and then locks it. -
(SYNCHRO/RELEASE! *synchronizer*)
releases its lock on synchronizer, freeing it to be locked by other threads. -
(WITH-LOCK *synchronizer* *body exprs...*)
acquires a lock on synchronizer, evaluates all of body exprs and then releases the lock.
For read/write locks, SYNCHRO/LOCK!
acquires a write lock which
will wait for all read locks to be released and keeps futher read
locks from being acquired until it is unlocked. SYNCHRO/READ/LOCK!
acquires a read lock in this case.
For condition variables, SYNCHRO/LOCK!
acquires the lock on the
condition variable's mutex, which will keep the variable from being
updated until it is released.
Finally, for a synchronized function, SYNCHRO/LOCK!
acquires the
function's internal lock, excluding calls to the function until it is
released.
Kno has a simple facility for maintaining thread-local variables. These are distinct from request variables which are local to processing of a particular request and may move from thread to thread as the request proceeds.
Thread variables are a flat namespace of symbols accessed via:
-
(THREAD/GET *varname*)
gets the thread-local value of the symbol varname (which is evaluated) and returns it. If varname is not bound in the current thread, this returns{}
(the empty choice). -
(THREAD/BOUND? *varname*)
returns#t
if varname is not bound in the current thread. -
(THREAD/SET! *varname* *value)
binds varname to value in the current thread, replacing the current value if there is one. -
(THREAD/ADD! *varname* *value*)
adds value to the bindings of varname in the current thread, creating a choice if neccessary.
THREAD/RESET-VARS!
resets all of the thread local variables for the
current thread.
(THREAD/CACHE *varname* *default*)
returns the value of varname in the
current thread if it exists. If it doesn't exist, default is
evaluated to generate a value which is both returned and saved (as
varname) in the current thread.