Siddhi is a language which belongs to the group of Streaming SQL languages, offering SQL-like approach to process data streams. This short code snippet shows one of the basics scenarios when you need to update in-memory partially or create a new record. The sample is build using scenario of calculating a total per some ID.
This example was tested in Editor tool of WSO2 Stream Processor.
@App:name("Siddhi app name")
@App:description("Application description")
define stream inp (
id int, -- key
value double
);
-- I'll stream into log to show the result
@sink(type='log')
define stream log (
id int,
total double,
created string, -- this property should get its value once and stay unchanged after that
updated string -- this field should be updated each time the value is updated
);
-- in-memory table which keeps the latest state
define table tbl (
id int,
total double,
created string,
updated string
);
-- PROCESSING
-- calculate total
from inp
select
id,
sum(value) as total,
time:utcTimestamp() as created,
time:utcTimestamp() as updated
group by id
insert into tmp; -- streaming to a temporary stream
-- left join with the existing table
from tmp left outer join tbl
on tbl.id == tmp.id
select
-- this values will be inserted to the table if there is no record with ID
tmp.id,
tmp.total,
coalesce(tbl.created, tmp.created) as created,
tmp.updated
update or insert into tbl
set
-- this block will update only those properties which are mentioned here
tbl.total = total,
tbl.updated = updated
on tbl.id == id;
-- in order to produce data stream with result
-- I'll join the initial stream with table and put value from the table to the data stream
from inp left outer join tbl
on tbl.id == inp.id
select
tbl.id,
tbl.total,
tbl.created,
tbl.updated
insert into log;
Clarification about joining of a table with original stream. The table itself cannot produce any stream and it needs some triggering mechanism. In current case I’m using incommig stream as the triggering for generating the output stream.