local m = {}
local UNIQUE = {}
local unpack = unpack or table.unpack
A standard for Lua callbacks that enables lightweight observables and iterables.
(type: 'start' | 'data' | 'end', payload?: any) => void
Definitions
(type: 'start' | 'data' | 'end', payload?: any) => void
'start'
as the first argument, we
say âthe callbag is greetedâ, while the code which performed the call
âgreets the callbagâ'data'
as the first argument, we
say âthe callbag is delivered dataâ, while the code which performed the
call âdelivers data to the callbagâ'end'
as the first argument, we
say âthe callbag is terminatedâ, while the code which performed the call
âterminates the callbagâBasic callbag factories and operators to get started with using the callbag spec.
Mostly ported from @staltzâs
JavaScript callbag-basics
.
local m = {}
local UNIQUE = {}
local unpack = unpack or table.unpack
function m.fromObs(observable)
return function (start, sink)
if start ~= 'start' then return end
local dispose
sink('start', function (t)
if t == 'end' and dispose then
if dispose.unsubscribe then
dispose.unsubscribe()
else
dispose()
end
end
end)
dispose = observable.subscribe({
next = function (x) return sink('data', x) end,
error = function (e) return sink('end', e) end,
complete = function () return sink('end') end,
})
end
end
A callbag pullable source (it only sends data when requested) that sends an arithmetic progression. Goes from var (inclusive) to limit (inclusive) in increments of step.
Ported from @franciscotlnâs
JavaScript callbag-range
.
function m.range(var, limit, step)
var, limit = tonumber(var), tonumber(limit)
if step == nil then step = 1 else step = tonumber(step) end
if not (var and limit and step) then
error('arguments not convertible to numbers')
end
return function (start, sink)
if start ~= 'start' then return end
local inLoop = false
local gotData = false
local completed = false
local function loop()
inLoop = true
while gotData and not completed do
gotData = false
if (step > 0 and var > limit) or (step <= 0 and var < limit)
then
sink('end')
completed = true
else
sink('data', var)
var = var + step
end
end
inLoop = false
end
sink('start', function (t)
if completed then return end
if t == 'data' then
gotData = true
if not inLoop then return loop() end
elseif t == 'end' then
completed = true
end
end)
end
end
Convert an array-like object (integer indices increasing from 1) to a callbag pullable source (it only sends data when requested).
function m.fromArray(arr)
return function (start, sink)
if start ~= 'start' then return end
local inLoop = false
local gotData = false
local completed = false
local index, res = 1, nil
local function loop()
inLoop = true
while gotData and not completed do
gotData = false
res = arr[index]
if res == nil then
sink('end')
completed = true
else
sink('data', res)
index = index + 1
end
end
inLoop = false
end
sink('start', function (t)
if completed then return end
if t == 'data' then
gotData = true
if not inLoop then loop() end
elseif t == 'end' then
completed = true
end
end)
end
end
function m.fromIter(f, s, var)
return function (start, sink)
if start ~= 'start' then return end
local inLoop = false
local gotData = false
local completed = false
local res
local function loop()
inLoop = true
while gotData and not completed do
gotData = false
res = {f(s, var)}
var = res[1]
if var == nil then
sink('end')
break
else
sink('data', res)
end
end
inLoop = false
end
sink('start', function (t)
if completed then return end
if t == 'data' then
gotData = true
if not inLoop and var ~= nil then loop() end
elseif t == 'end' then
completed = true
end
end)
end
end
Callbag sink that consumes both pullable and listenable sources. When called on a pullable source, it will iterate through its data. When called on a listenable source, it will observe its data.
function m.forEach(operation) return function (source)
local talkback
source('start', function (t, d)
if t == 'start' then talkback = d
elseif t == 'data' then operation(d) end
if t == 'start' or t == 'data' then talkback('data') end
end)
end end
function m.toIter(source)
return coroutine.wrap(function ()
local talkback
local var
source('start', function (t, d)
if t == 'start' then talkback = d
elseif t == 'data' then var = d
elseif t == 'end' then talkback = nil end
end)
while talkback do
talkback('data')
if var ~= nil then
coroutine.yield(var)
var = nil
else
if talkback then talkback('end') end
return
end
end
end)
end
Callbag operator that applies a transformation on data passing through it. Works on either pullable or listenable sources.
function m.map(f) return function (source)
return function (start, sink)
if start ~= 'start' then return end
source('start', function (t, d)
if t == 'data' then d = f(d) end
sink(t, d)
end)
end
end end
Callbag operator that combines consecutive values from the same source. Itâs
essentially like JavaScriptâs array .reduce
, but delivers a new accumulated
value for each value from the callbag source. Works on either pullable or
listenable sources.
function m.scan(...)
local reducer, seed = ...
local hasAcc = select('#', ...) == 2
return function (source)
return function (start, sink)
if start ~= 'start' then return end
local acc = seed
source('start', function (t, d)
if t == 'data' then
if hasAcc then
acc = reducer(acc, d)
else
hasAcc = true
acc = d
end
sink(t, acc)
else sink(t, d) end
end)
end
end
end
Callbag operator that flattens a higher-order callbag source. Like RxJS âswitchâ or xstream âflattenâ. Use it with map to get behavior equivalent to âswitchMapâ. Works on either pullable or listenable sources.
function m.flatten(source)
return function (start, sink)
if start ~= 'start' then return end
local outerEnded = false
local outerTalkback, innerTalkback
local function talkback(t, d)
if t == 'data' then (innerTalkback or outerTalkback)(t, d)
elseif t == 'end' then
if innerTalkback then innerTalkback(t) end
outerTalkback(t)
end
end
source('start', function (T, D)
if T == 'start' then
outerTalkback = D
sink(T, talkback)
elseif T == 'data' then
local innerSource = D
if innerTalkback then innerTalkback('end') end
innerSource('start', function (t, d)
if t == 'start' then
innerTalkback = d
innerTalkback('data')
elseif t == 'data' then sink(t, d)
elseif t == 'end' then
if d then
outerTalkback(t)
sink(t, d)
else
if outerEnded then sink(t)
else
innerTalkback = nil
outerTalkback('data')
end
end
end
end)
elseif T == 'end' then
if D then
if innerTalkback then innerTalkback(T) end
sink(T, D)
else
if not innerTalkback then sink(T)
else outerEnded = true end
end
end
end)
end
end
Callbag operator that limits the amount of data sent by a source. Works on either pullable and listenable sources.
function m.take(max) return function (source)
return function (start, sink)
if start ~= 'start' then return end
local taken = 0
local sourceTalkback
local function talkback(t, d)
if taken < max then sourceTalkback(t, d) end
end
source('start', function (t, d)
if t == 'start' then
sourceTalkback = d
sink(t, talkback)
elseif t == 'data' then
if taken < max then
taken = taken + 1
sink(t, d)
if taken == max then
sink('end')
sourceTalkback('end')
end
end
else
sink(t, d)
end
end)
end
end end
Callbag operator that skips the first N data points of a source. Works on either pullable and listenable sources.
function m.skip(max) return function (source)
return function (start, sink)
if start ~= 'start' then return end
local skipped = 0
local talkback
source('start', function (t, d)
if t == 'start' then
talkback = d
sink(t, d)
elseif t == 'data' then
if skipped < max then
skipped = skipped + 1
talkback('data')
else sink(t, d) end
else
sink(t, d)
end
end)
end
end end
Callbag operator that conditionally lets data pass through. Works on either pullable or listenable sources.
function m.filter(condition) return function (source)
return function(start, sink)
if start ~= 'start' then return end
local talkback
source('start', function (t, d)
if t == 'start' then
talkback = d
sink(t, d)
elseif t == 'data' then
if condition(d) then sink(t, d)
else talkback('data') end
else sink(t, d) end
end)
end
end end
Callbag factory that merges data from multiple callbag sources. Works well with listenable sources, and while it may work for some pullable sources, it is only designed for listenable sources.
Callbag factory that concatenates the data from multiple (2 or more) callbag sources. It starts each source at a time: waits for the previous source to end before starting the next source. Works with both pullable and listenable sources.
function m.concat(...)
local n = select('#', ...)
local sources = {...}
return function (start, sink)
if start ~= 'start' then return end
if n == 0 then
sink('start', function () end)
sink('end')
return
end
local i = 1
local sourceTalkback
local lastPull = UNIQUE
local function talkback(t, d)
if t == 'data' then lastPull = d end
sourceTalkback(t, d)
end
local function next()
if i > n then
sink('end')
return
end
sources[i]('start', function (t, d)
if t == 'start' then
sourceTalkback = d
if i == 1 then
sink(t, talkback)
elseif lastPull ~= UNIQUE then
sourceTalkback('data', lastPull)
end
elseif t == 'end' then
if d then
sink(t, d)
else
i = i + 1
return next()
end
else
sink(t, d)
end
end)
end
return next()
end
end
Callbag factory that combines the latest data points from multiple (2 or more) callbag sources. It delivers those latest values as an array. Works with both pullable and listenable sources.
Callbag operator that broadcasts a single source to multiple sinks. Does
reference counting on sinks and starts the source when the first sink gets
connected, similar to RxJS
.share()
.
Works on either pullable or listenable sources.
function m.share(source)
local sinks = {}
local sourceTalkback
local function shared(start, sink)
if start ~= 'start' then return end
table.insert(sinks, sink)
local function talkback(t, d)
if t == 'end' then
local i, found = 1, false
while not found do
local v = sinks[i]
if v == nil then break end
if v == sink then found = true end
i = i + 1
end
if found then table.remove(sinks, i) end
if #sinks == 0 then sourceTalkback(t) end
else
sourceTalkback(t, d)
end
end
if #sinks == 1 then
source('start', function (t, d)
if t == 'start' then
sourceTalkback = d
sink(t, talkback)
else
local sinksCopy = {}
for i, v in ipairs(sinks) do sinksCopy[i] = v end
for _, s in ipairs(sinksCopy) do s(t, d) end
end
if t == 'end' then sinks = {} end
end)
else
sink('start', talkback)
end
end
return shared
end
Utility function for plugging callbags together in chain. This utility
actually doesnât rely on Callbag specifics, and is basically the same as
Ramdaâs pipe
or lodashâs flow
. Anyway, this exists just to play nicely
with the ecosystem, and to facilitate the import of the function.
function m.pipe(...)
local res = {(...)}
for i = 2, select('#', ...) do
res = {select(i, ...)(unpack(res))}
end
return unpack(res)
end
function m.pipeValues(...)
local res = ...
for i = 2, select('#', ...) do
res = {select(i, ...)(unpack(res))}
end
return unpack(res)
end
return m