Until now, our discussion of query semantics has been devoid of any notion of timeliness of output delivery. We have proceeded with the notion that any input event that enters the system is used in the computation and the correct results are streamed out. However, for any output to be streamed out of a StreamInsight query, the input stream must contain special events called CTI events. This section explains the need for these special events in your input stream.
StreamInsight is a temporal query processing engine over complex event streams. It is built for applications where time – as in real clock time (“now” versus “over the past few seconds” versus “past two hours”) – plays a central role in the computation and comprehension of results. Each event that is processed by a query represents a payload (i.e., data elements) along with start and end timestamps that demarcate the lifetime of that event.
StreamInsight expects these start and end timestamps to be assigned by the application – in other words, these are application timestamps. StreamInsight does NOT implicitly assign any values to these timestamps – the program that enqueues events into the server sets these timestamp from a source. We will see why this is an important distinction shortly.
So far, all of our discussion of query processing has been at the “logical” level. To define query semantics, we used an input event table, built the time plot to understand how events interact with each other, and derived the output event table. In all of our examples, the StartTime of the most recent event input into the system was equal to, or at least one tick greater than, the event that arrived in the system just before it. The End Time of a particular interval event may have overlapped or exceeded that of its preceding event, which is not a problem. In short, we assumed that all incoming events arriving into a StreamInsight server have their application StartTimes ordered in relation to the system clock of the host server – i.e., they were Ordered Events.
Ordered Events are achievable when the StreamInsight server is on the same machine as the event source. But in many real-life deployments, the server and the event sources/sinks are in disparate machines connected by networks for various reasons. Such multiple systems contribute to latency in the delivery of events from one system to another. This gives rise to the possibility of events arriving into a StreamInsight server with their timestamps out of order in relation to the system clock of the host server. StreamInsight queries are designed from ground-up to enable the user to accommodate such Out Of Order Events.
Note that a StreamInsight query may have both stateful (Joins, built-in and user-defined Aggregates, user-defined Operators) and stateless operators (Project, Group and Apply, Union, Filter) in its queries. The ordering of events can directly impact the results of stateful operators, as shown in the example below of a Count with tumbling windows.
First, let's understand the time plot better. The X axis shows the progression of application time. The event labels, however, are directly representative of their relative ordering with respect to the clock on the host server. For simplicity, we have considered point in time events. The values of these local timestamps do not matter – but their relative ordering matters. The following table explicitly calls out what “out of order” means – in the second table on the right, e4 arrives with an application timestamp that is out of order relative to the system/wall clock time. Since e3 is the first incoming event, no order is yet established. But once e3 has happened, then the next event could be in order or out of order. Note that tumbling windows report results for the next 3 minutes by default.
Event |
Application |
|
Event |
Application |
Time |
|
|
Time |
|
e3 |
12:03:00 10:00:00 |
|
e3 |
12:08:00 10:00:00 |
e4 |
12:07:00 |
|
e4 |
12:07:00 |
10:02:00 |
|
|
10:02:00 |
|
Click here to view table as image
Given this understanding, the count values for the five tumbling windows from the two time plots are:
Event |
StartTime |
Count |
|
Event |
StartTime |
Count |
o1 |
12:00 |
2 |
|
o1 |
12:00 |
2 |
o2 |
12:03 |
1 |
|
|
12:03 |
-- |
o3 |
12:06 |
1 |
|
o2 |
12:06 |
2 |
o4 |
12:09 |
2 |
|
o3 |
12:09 |
1 |
o5 |
12:12 |
1 |
|
o4 |
12:12 |
1 |
Click here to view table as image
If you have agreed with these results, it is because of two implicit assumptions in your understanding of the example:
1. That the count is computed for a window defined by specific start and end times, spanning 3 time units – this is correct, and a constant.
2. That we are asking the query to report the output once every three minutes aligned with the window boundaries (i.e., we are asking for the output at each horizontal line in the table).
Assume that our application acts on the output of this query with this logic:
if (count < 2)
{
BuyOrder();
}
else
{
SellOrder();
}
Click here to view code as image
Then, with no out of order events (table on the left), o2 would have triggered a BuyOrder by virtue of e3’s arrival. With e3 arriving out of order (table on the right), the action would be SellOrder – even if at a different time interval (12:06). If this application were a trading station, we will discover this mistake in a post-audit of the transaction, and undertake some compensatory action.
Now, assume that we know that the network transmission is unreliable, and there is a possibility of events arriving out of order into the StreamInsight server. Instead of every 3 minutes, assume that we request that the output be reported once every six minutes. Then the second table becomes:
Event |
StartTime |
Count |
o1 |
12:00 |
2 |
o2 o3 |
12:06 |
2 |
o4 |
12:12 |
1 <tbd> |
Click here to view table as image
Again, each horizontal line in the table signifies the reporting of the output. By doing this, we have accommodated for the late arrival of event e3, and thereby presented a correct (SellOrder) result even in the face of unordered input. But this correctness of output comes at a price – we have doubled the latency (or put another way, reduced the “liveliness”) of the query output by a factor of two.
The key takeaways from this discussion are:
1. StreamInsight confronts out of order processing of events intrinsically in its query algebra
2. StreamInsight provides a mechanism for the user to make a conscious compromise between liveliness of the query – at the expense of possible incorrect output, or absolute correctness – at the expense of slowness in reporting of output.
The mechanism for conveying this compromise via the input event stream to the query is a special punctuation event called Current Time Increment. This is the topic of our next section.
A CTI event provides a watermarking functionality in the stream to affect two fundamental attributes of the query:
1. The liveliness of the query – the pace at which the query is able to emit output.
2. The statefulness of the query – the pace at which the query is able to release events that contribute towards state (for joins, aggregates, etc.).
You can explicitly issue the CTI event using the EnqueueCtiEvent() method – with the timestamp as a parameter.
When the streaming application enqueues a CTI into the input stream, you accomplish two things:
1. It provides a guarantee to the query that, from the point of issuance of the CTI, it will not enqueue any event whose Start Time is less than the timestamp of the CTI Event (i.e., the query will not see an event that is earlier in application time than the CTI event). If a situation does arise where such an event is enqueued into the query, the query will return a “CTI Violation” exception.
The figure below depicts a scenario where event e5 arrives with a timestamp of 12:07 after a CTI event that has been issued with timestamp 12:09, which is a cause for a CTI violation.
IMPORTANT – In your program, note that the exception due to CTI violation will happen only in the scenario where you explicitly enqueue the CTI Events without using AdvanceTimeSettings() to define your CTI progression. This interface is explained below.
2. It informs the query that any stateful operator that has processed events that arrived preceding the CTI can release the events – under the following specific conditions:
a. an interval event that preceded this CTI event can be released IFF the CTI's timestamp is greater than the End Time of the interval event.
b. a point event which preceded this CTI event can release the point event IFF the CTI's timestamp is greater than the Start Time of the point event.
c. an edge event (Start or End Edge does not matter) which preceded this CTI event can release the edge event IFF the CTI's timestamp is greater than the Start Time of the edge event.
Given the above background, it is easy to see how CTI events help in achieving the compromise between blocking (i.e., waiting for unordered events) for correctness of output versus achieving a lively stream (i.e., not waiting for events, but accepting that some of the output results may be incorrect). In the above figure, point events e3 and e4 are unordered – if the CTI was issued once every event, then there would have been an error in the output. But if this error was acceptable, then a CTI after every event (as opposed to once every 3 minutes) would have resulted in a maximally responsive query.
Important: If you know that the events arrive into a query ordered on time, then it is highly recommended that you issue a CTI coinciding with the Start Time of every event that is being enqueued.
A practical problem with CTIs is that most of us cannot exactly predict when we want to issue a CTI explicitly. There are a couple of ways a developer can implement the streaming application (or to be more precise, the input adapter) to issue CTIs:
1. Populate the input stream itself with CTI events after one or more INSERT events. This is feasible if the event source is persisted, but even then, it is a cumbersome process or an overhead to seed the input stream with CTI markers – which the input adapter can then use as an indicator to call EnqueueCtiEvent().
2. Programmatically enqueue CTIs from the input adapter. You can specify a CTI frequency in terms of one CTI every N events through the adapter configuration, and the adapter can use this to invoke EnqueueCtiEvent() once every N events. This however makes the adapter implementation complex – if the EnqueueCti fails because of pushback from the engine (see Adapter documentation), then you have to keep track of enqueueing the CTI event along with INSERT events at the next opportunity provided by the engine.
So the solution is to automate the enqueue of CTIs in a declarative fashion. StreamInsight provides the mechanism to specify this as an interface implementation in the adapter factory (see Adapter documentation) or as part of the query binding specification itself. See the product documentation on AdvanceTimeSettings.