Siddhi: Count events and issue only the last one with the total value

In data streaming each event is transmitting from one processing block to another once events arrive. For example, you have three events like this:

{id: 1, value: 1}
{id: 1, value: 1}
{id: 1, value: 1}

You want to calculate the sum of value and issue only the result event which will be like this {id: 1, value: 3}. In it's not trivial because even if you write a counter with some time window, you'll get three events in the end.

{id: 1, value: 1}
{id: 1, value: 2}
{id: 1, value: 3}

In order to achieve expected behavior you would want to buffer events and issue only the last one. It's possible to achieve using windowing and delaying operations and this article gives you an example of this.

At first look at the example. This test application produces only one event with the total sum of value if you send three as described above. Below I'll describe the code.

@App:name("test")
@App:description("Test app")

define stream input (id string, value int);

define table tbl (
    id string,
    value long
    );
    
from input#window.delay(5 seconds)
select *
insert into tmp3;

partition with (id of input)
begin
    from input#window.time(5 seconds)
    select id, sum(value) as value
    update or insert into tbl
        set tbl.value = value
        on tbl.id == id;
end;

from tmp3 as s left outer join tbl as t on t.id == s.id
select
    s.id,
    default(t.value, 0L) as value
insert into tmp4;

from tmp4#unique:deduplicate(id, 5 seconds)
select *
insert into tmp5;

from tmp5#log()
select *
insert into t;

table – you need it to accumulate the last aggregation, which you'll get further in the code.

#window.delay() – at first you need to delay incoming events for some period. This will allow you to defer processing of incoming events and prepare other values for the moment when they be needed.

partition with… and #window.time() – this is the grouping and aggregation within a sliding window period. You will aggregate events for the same period which you're deferring them on. So when a deferred events are released, your code should have their total sum ready.

join … – here you're joining deferred events with their total sums. At this moment the table contains the total sum (the final one), but you'll still get all events (in our case 3 events).

#unique:deduplicate – this block just allow to pass only one event for unique id. In the end you'll get only one event with total sum by value.

Tags:

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Related Post

%d bloggers like this: