So far, we have based our examples on a single toll booth. A toll station has several toll booths, and most highways or urban areas will have multiple toll stations. For example, there are 20 such toll stations in a 20 mile radius around New York City that collectively handle 1.4 million crossings a day. (((1.4x106/20)/24)/3600) implies an average of a vehicle per minute at a toll station – which is not much. But if you assume that most toll stations are integrated with sensor-based traffic monitoring systems on main arteries and roadways, the volume of events and the kinds of interesting queries that you can ask will demand a scalable querying mechanism. Some examples:
• Which toll stations are congested – now, over the last N minutes/hours? (integration with traffic management)
• Which toll booths are most active in a given station? (this helps in planning road work, shifts, routing)
• What is the inbound (from suburbs to city) versus outbound (city to suburbs) volume of vehicles at a toll station?
• What is the count of transgressions (vehicles with no/expired tags) for a given automated booth at a given station?
These questions require grouping and partitioning of vehicles across different toll stations and possibly other logical hierarchies of related objects in the application. The computations themselves may be far more sophisticated than the simple count aggregate we have seen so far. The Group and Apply operator in StreamInsight addresses this need for partitioned, scaled-out processing. We will explain this feature and introduce a new windowing construct using a simple grouping query:
[Partitioned Hopping window] Find the toll generated from vehicles being processed at each toll booth at some time over a 3 minute window, with the window advancing in one minute hops. Provide the value as of the last reported result.
Figure 10. Hopping windows with Group and Apply
Step 1 Model input and output events from the application’s perspective.
We’ll have interval events for input and point events for the output.
Step 2 Understand the desired query semantics by constructing sample input and output CHTs.
The time plot for this input is shown in Figure 10. For this example we will extend the time plot adding events for toll booth 2, colored red, and add toll amounts, presented next to the event name in parenthesis. For simplicity we limit to just showing events from these two booths. The green lines at the bottom of the time plot show the progress of a hopping window of size 3 minutes, with a hop size of 1 minute.
In this query, the output is the sum of the toll generated in each hopping window and is computed on a per-booth basis. As we seen earlier, hopping windows are very chatty and generate a lot of output, so for brevity, we will not show the complete result here. A partial result is shown on the bottom track of the time plot in Figure 12. For clarity we colored events according to the booth group from which they were produced. They are also annotated with the resulting toll sum and the set of events that contributed to it, as we did in previous examples.
The important takeaways from the time plot are:
Figure 11. Query Graph for [Partitioned Hopping window] query
• An event output is generated for each hopping window for each group of events (in this case 2 groups).
• The aggregate operation (Sum) is repeated for each hopping window.
• The same set of events repeat themselves in some windows. For example, events o11, o13, and o15 compute the Sum() operation over the same set of two events { e4, e7 }.
Step 3 Gather elements of query logic to compute the output and develop an event flow graph.
Figure 12. Partial output events from [Partitioned Hopping window] query
• Group and Apply – we need a grouping operator to group the events in the stream based on Booth ID.
• Window – we choose a hopping window of size 3 minutes, with a hop size of 1 minute, as per the requirements.
• Sum() – we use this aggregation operator to return the toll amount corresponding to a window
Step 4 Compose the query as a streaming transformation of input to output.
In [Partitioned Hopping window], the grouping clause groups events by TollId into groups per toll booth. A hopping window is defined on each group, and within each hopping window a Sum is computed against the value of the Toll. As you can see, the query formulation is very intuitive and follows the English description to a tee.
var query = from e in inputStream group e by e.TollId into perTollBooth from win in perTollBooth.HoppingWindow(TimeSpan.FromMinutes(3), TimeSpan.FromMinutes(1)) select new Toll { TollId = perTollBooth.Key, TollAmount = win.Sum(e => e.Toll) };
Click here to view code as image
In this example, we use Sum as the aggregate. Built-in aggregates are incremental, so this is not much of a problem. While this may seem trivial in this example, aggregates in real-life scenarios can be fairly complex. Each computation has to load up the same set of events in its state. High volume streams in applications like trading platforms can have 10’s of 1000’s, if not millions, of such groups. The choice of hopping window in such scenarios can become expensive.
A windowing construct whose size and forward progression is truly influenced by the incoming events, rather than a static window definition, is the Snapshot Window. Rather than partitioning time into fixed-size buckets, a snapshot window segments time according to the start and end timestamps of all occurring changes. The duration of a snapshot window is defined by a pair of closest event end points – in terms of the event StartTime and EndTime timestamps – from the same or different events. To clarify, consider two events with start and end timestamps {A, 12:10, 12:30} and {B, 12:15, 12:25}. This yields three snapshot windows {w1, 12:10, 12:15}, {w2, 12:15, 12:25} and {w3, 12:25, 12:30}. Thus, unlike a hopping window, the time span of a snapshot window will not contain the start time or end time stamps for any individual event – the events themselves define the window.
To explain this, we will recast [Partitioned Hopping Window] to use the snapshot window instead.
[Partitioned Sliding window] Find the most recent toll generated from vehicles being processed at each station over a one minute window reporting the result every time a change occurs in the input.
Step 1 Model input and output events from the application’s perspective. No change from [Partitioned Hopping window].
Step 2 Understand the desired query semantics by constructing sample input and output CHTs
The phrase “every time a change occurs” in the above statement points us towards snapshot, rather than hopping, windows. Along with this, we are required to report the most recent toll amount collected over a one minute window. In other words, we need to find the aggregate toll amount over the last snapshot window lasting one minute from any given point in time along the time axis. What we have just done is to essentially request a Sliding Window - a window that slides along the time axis, but with the output being reported whenever an event arrives into the system.
To compute the aggregate, you want to pick a point on the time axis - say, “Now” - and look back one minute. But recall that it is the events that define the snapshot. So how do we simulate the idea of “look for events back one minute”? The answer is simple – for each event that arrives, you make the events look forward one minute. In other words, you change the duration (i.e., the lifetime) of the event ahead by one minute.
Figure 13. Group and Apply with Snapshot windows
The time plot for this input is shown in Figure 13, based on the same input considered for hopping windows. We have split the input stream into two groups based on the TollId as a partitioning key, and they are shown in blue (for TollId 1) and red (for TollId 2) on separate tracks. As before, windows are depicted as green line segments, but this time we had to draw windows for each group separately since they constitute a snapshot in time for a particular group and so are different across groups. The dotted blue and red extensions of the original events show how the event duration (i.e., the lifetime of the event) has been altered or extended by one minute into the future. The intended results of the query are shown in the time plot in Figure 14. And just like before, result events are color coded according to the toll booth to which they belong, as well as annotated with the resulting toll sum and set of events contributing to it.
Some observations:
• The first thing you’ll notice is that the number of output events is reduced by a third compared to hopping windows. The smaller window size of 1 minute compared to 3 minutes in hopping windows does not matter since this event duration is being uniformly applied to all events. The output stream is event driven – i.e., generated only at changes in the input. This implies that at any point in time, you see the most recent aggregate value.
Figure 14. Output events from Group and Apply with snapshot windows
• Each window has at least one new event, rather than a repetition of the same set of events from the past window. This implies that each aggregate computation retains just enough and the most current state – which is significant when you consider thousands or millions of groups (we discussed this in the introduction to Group and Apply)
• The output reflects the most current event-driven state of the stream by looking back over a specified time period. From this perspective, snapshot window combined with AlterEventDuration is the nearest equivalent to a sliding window in StreamInsight, in that it represents the latest output of the stream at any point in time. We say “nearest equivalent” because a pure sliding window returns an output at every tick of advancing time.
Figure 15. Query graph for [Partitioned Snapshot window] query
Step 3 Gather elements of query logic to compute the output and develop an event flow graph.
• Group and Apply – we need a grouping operator to group the events in the stream based on Booth ID.
• Window – we choose a snapshot window – as per requirement.
• Sum() – to return the summed toll amount within a window
• Alter Event Duration – to extend the EndTime stamp of the event by one minute
The event flow graph showing the query with the above components is shown in Figure 15.
Step 4 Compose the query as a streaming transformation of input to output.
By now, the query should be very intuitive to understand. The input stream is grouped by TollId into per-TollBooth streams. On each of these streams, every event has its end time advanced by a minute, and then a snapshot window is applied on the stream. Within each snapshot window, we compute the aggregates Sum() and Count(). The second statement transforms the output events into Point events.
var query = from e in inputStream.AlterEventDuration(e => e.EndTime - e.StartTime + TimeSpan.FromMinutes(1)) group e by e.TollId into perTollBooth from win in perTollBooth.SnapshotWindow() select new Toll { TollId = perTollBooth.Key, TollAmount = win.Sum(e => e.Toll), VehicleCount = win.Count() // computed as a bonus, not asked in the query };
Click here to view code as image
One can now easily compute a moving average. First transform the output events into Point events and then, given the sum and the count, compute the moving average of toll collected through a given tollbooth, as follows:
var partitionedMovingAverage = from e in query.ToPointEventStream() select new TollAverage { TollId = e.TollId, AverageToll = e.TollAmount / e.VehicleCount };