Reactive Programming?
I was just sitting down this morning, toying again with the notion of reactive programming. And I suddenly had the realization that, for many (most?) use cases, the complexity of multithreaded implementation, as in my previous versions, was probably unnecessary. In fact the main body could be condensed down to about 40 lines of code, give or take…Here it is in one thread, using a priority queue to schedule items of work. The priorities of the queue are unused in the simple use cases shown here. But in something like my GigaDSP platform, where we have multi-rate algorithms wired up, the use of priorities allows for interstitial dispatch - e.g., Runge-Kutta integration which needs 4 sub-steps in each major step. Those sub-steps would be inserted at a higher priority in the queue to place them ahead of the next major processing step.
;; -------------------------------------------------------
(defpackage #:reactive
(:use #:common-lisp)
(:export
))
(in-package #:reactive)
(defclass reactor-node ()
((subscribers :accessor rn-subs :initform nil)
))
(defmethod subscribe ((rn reactor-node) fn)
(pushnew fn (rn-subs rn)))
(defmethod unsubscribe ((rn reactor-node) fn)
(setf (rn-subs rn) (delete fn (rn-subs rn))))
(defmethod publish ((rn reactor-node) data &key (level 0))
(qappend (mapcar (lambda (fn)
(lambda ()
(funcall fn data)))
(rn-subs rn))
level))
;; -------------------------------------------------------
;; The computation engine... single threaded, based on a priority queue
(defvar *running* nil)
(defvar *todo* (maps:empty))
(defun do-qdo (fn)
(funcall fn)
(unless *running*
(let ((*running* t))
(um:nlet-tail iter ()
(let ((thunk (qnext)))
(when thunk
(ignore-errors (funcall thunk))
(iter))))
)))
(defmacro qdo (&body body)
`(do-qdo (lambda () ,@body)))
(defun qappend (lst &optional (level 0))
(qdo
(let ((q (maps:find level *todo*)))
(setf *todo* (maps:add level (nconc q lst) *todo*)))))
(defun do-qadd (item &optional (level 0))
(qappend (list item) level))
(defmacro qadd ((&optional (level 0)) &body body)
`(do-qadd (lambda () ,@body) ,level))
(editor:setup-indent "qadd" 1)
(defun qnext ()
(unless (maps:is-empty *todo*)
(let* ((ent (sets:max-elt *todo*))
(key (maps:map-cell-key ent))
(q (maps:map-cell-val ent))
(nxt (pop q)))
(setf *todo*
(if q
(maps:add key q *todo*)
(sets:remove key *todo*)))
nxt)))
;; ----------------------------------------------
;; Simple use cases...
(defvar *rn* (make-instance 'reactor-node))
(defun plot-it (data)
(typecase data
(sequence
(plt:plot 'plt data :clear t))))
(defvar *dbg* (debug-stream:make-debug-stream))
(defun print-it (data)
(debug-stream:pr *dbg* data))
(defun inspect-it (data)
(inspect data))
(subscribe *rn* 'plot-it)
(subscribe *rn* 'print-it)
(subscribe *rn* 'inspect-it)
(publish *rn* (vm:unoise 100 1.0))
(publish *rn* (loop for x from 0.0 to 1.57 by 0.05 collect (sin x)))
(loop repeat 10 do (publish *rn* (vm:unoise 100 1.0)))
(let ()
;; example of self-extending work items
(labels ((doit (lst)
(when lst
(print (car lst))
(qadd ()
(doit (cdr lst))))))
(qadd ()
(doit '(1 2 3)))
))
;; -------------------------------------------------------
One of the basic tools I keep coming back to, and used in the above, is an implementation of nearly balanced binary trees for Sets and Maps, and in this case a Map used as a Priority Queue. The code for my balanced binary trees was shamelessly borrowed from the OCaml standard library. The people at INRIA produce some really high quality code.
My initial translation from OCaml to Lisp used bare 4-element Lists as tuples for tree nodes. It was very speedy under optimizations. But the other day, my Cat “Chara” decided to insert a few characters in my code and all hell broke lose on running his implementation.
So I broke down and reimplemented it with CLOS and overt tree-node classes, to disambiguate them from bare Lists. Bare Lists are just too prevalent in Lisp and there are no distinguishing characteristics for them. CLOS allows me to distinguish tree-nodes from Chara’s augmented lists…
- DM