Siddhi: Update partially in-memory table

is a language which belongs to the group of Streaming SQL languages, offering SQL-like approach to process data streams. This short code snippet shows one of the basics scenarios when you need to update in-memory partially or create a new record. The sample is build using scenario of calculating a total per some ID.

This example was tested in Editor tool of WSO2 Stream Processor.

@App:name(" app name")
@App:description("Application description")

define stream inp (
    id int, -- key
    value double
);

-- I'll stream into log to show the result
@sink(type='log')
define stream log (
	id int,
	total double,
	created string, -- this property should get its value once and stay unchanged after that
	updated string -- this field should be updated each time the value is updated
);

-- in-memory table which keeps the latest state
define table tbl (
	id int,
	total double,
	created string,
	updated string
);

-- PROCESSING

-- calculate total

from inp
select
	id,
	sum(value) as total,
	time:utcTimestamp() as created,
	time:utcTimestamp() as updated
group by id
insert into tmp; -- streaming to a temporary stream

-- left join with the existing table
from tmp left outer join tbl
    on tbl.id == tmp.id
select
    -- this values will be inserted to the table if there is no record with ID
    tmp.id,
    tmp.total,
    coalesce(tbl.created, tmp.created) as created,
    tmp.updated
update or insert into tbl
    set
        -- this block will update only those properties which are mentioned here
        tbl.total = total,
        tbl.updated = updated
	on tbl.id == id;

-- in order to produce data stream with result
-- I'll join the initial stream with table and put value from the table to the data stream
from inp left outer join tbl
    on tbl.id == inp.id
select
	tbl.id,
	tbl.total,
	tbl.created,
	tbl.updated
insert into log;

Clarification about joining of a table with original stream. The table itself cannot produce any stream and it needs some triggering mechanism. In current case I’m using incommig stream as the triggering for generating the output stream.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Related Post

%d bloggers like this: