Siddhi is a language which belongs to the group of Streaming SQL languages, offering SQL-like approach to process data streams. This short code snippet shows one of the basics scenarios when you need to update in-memory partially or create a new record. The sample is build using scenario of calculating a total per some ID.
This example was tested in Editor tool of WSO2 Stream Processor.
@App:name("Siddhi app name") @App:description("Application description") define stream inp ( id int, -- key value double ); -- I'll stream into log to show the result @sink(type='log') define stream log ( id int, total double, created string, -- this property should get its value once and stay unchanged after that updated string -- this field should be updated each time the value is updated ); -- in-memory table which keeps the latest state define table tbl ( id int, total double, created string, updated string ); -- PROCESSING -- calculate total from inp select id, sum(value) as total, time:utcTimestamp() as created, time:utcTimestamp() as updated group by id insert into tmp; -- streaming to a temporary stream -- left join with the existing table from tmp left outer join tbl on tbl.id == tmp.id select -- this values will be inserted to the table if there is no record with ID tmp.id, tmp.total, coalesce(tbl.created, tmp.created) as created, tmp.updated update or insert into tbl set -- this block will update only those properties which are mentioned here tbl.total = total, tbl.updated = updated on tbl.id == id; -- in order to produce data stream with result -- I'll join the initial stream with table and put value from the table to the data stream from inp left outer join tbl on tbl.id == inp.id select tbl.id, tbl.total, tbl.created, tbl.updated insert into log;
Clarification about joining of a table with original stream. The table itself cannot produce any stream and it needs some triggering mechanism. In current case I’m using incommig stream as the triggering for generating the output stream.