StreamInsight is a temporal query processing engine, and one of the key capabilities of the engine is to correlate event occurrences with respect to time and their payload values. Many of our everyday queries involve a temporal correlation: “what was the S&P index when this stock went up by 15%?”, “when did this incident happen (before/after another event)”?, “what caused this to happen?”, “what should happen when events A, B and C happen in that order (pattern)”? The JOIN operation enables this correlation between event streams in StreamInsight.
The Join operation in StreamInsight is temporal, and is characterized by the following behavior. A Join between two events from the respective streams is valid:
(a) for the duration of overlap of lifetimes of the events,
(b) when the payload values of the events satisfy the join predicate.
Let's consider this simple application.
[Inner Join] “Report the output whenever Toll Booth 2 has processed the same number of vehicles as Toll Booth 1, computed over the last 1 minute, every time a change occurs in either stream.”
Step 1 Model input and output events from the application’s perspective.
We will use the input stream used for [Partitioned Sliding Window].
Step 2 Understand the desired query semantics by constructing sample input and output event CHTs.
Assume that we are able to separate the streams for Toll Booth 1 and 2. The objective is to correlate the first stream against the second, and generate an output whenever the count of vehicles between the two streams is the same.
The time plot for this input is shown in Figure 16. The phrase “every time a change occurs in either stream” motivates us to define a snapshot window on the streams. To get count computation over the last 1 minute we will need to extend event end times by 1 minute just like in previous examples (and also depicted as dotted-line extensions). And finally we would need to filter the results separately for toll booth 1 and toll booth 2. The resulting stream for toll booth 1 is depicted in blue in the top track, and for toll booth 2 in red in the bottom track. Each of these tracks shows the snapshot windows in green which are annotated for convenience with the resulting count corresponding to the window. The Join operation then compares the two streams. The final result is depicted in the middle and represents overlapping intervals where the count of vehicles for toll booth 1 matches the count for toll booth 2. Again, participating events are included in the event annotations for convenience.
Step 3 Gather elements of query logic to compute the output and develop an event flow graph.
In [Partitioned Sliding Window], the ability of the Group and Apply operator to partition events based on a key enabled us to group the events by TollId, and compute the counts for every snapshot window. In this example, we will replicate that stream, and filter out the events that correspond to TollId 1 and 2 respectively from each replica of the stream. Then we will correlate these two streams using the join based on the event count. StreamInsight provides a multicast operator to replicate streams.
Step 4 Compose the query as a streaming transformation of input to output.
We’ll reuse the logic from [Partitioned Sliding Window] to get a stream of per toll booth aggregations. The two streams stream1 and stream2 are results of filtering just the events for toll booth 1 and 2 respectively. The logic in each of them is applied over the replicas of the output stream from partitionedSlidingWindow that are generated as a result of the Multicast operation. We then correlate these two streams using a Join operation based on the vehicle count from both streams.
Figure 16. Inner Join between two streams (equal vehicle count is the JOIN predicate)
var stream1 = from e in partitionedSlidingWindow where e.TollId == "1" select e; var stream2 = from e in partitionedSlidingWindow where e.TollId == "2" select e; var query = from e1 in stream1 join e2 in stream2 on e1.VehicleCount equals e2.VehicleCount select new TollCompare { TollId_1 = e1.TollId, TollId_2 = e2.TollId, VehicleCount = e1.VehicleCount };
Click here to view code as image
The Multicast operator is implicitly invoked as a consequence of the LINQ statements that define stream1 and stream2. Note that any number of such replicas can be generated. The JOIN that you see above is a case of inner join.
A cross join (aka Cartesian join) involves combining payload fields of every event in the first stream with payload fields of every event from the other, joined stream. It has no join condition.
var crossJoin = from e1 in stream1 from e2 in stream2 select new TollCompare { TollId_1 = e1.TollId, TollId_2 = e2.TollId, VehicleCount = e1.VehicleCount };
Click here to view code as image
Cartesian joins bring up the specter of an explosion of rows in relational systems. In this case, that risk exists only if the events that are joined also have exactly overlapping lifetimes on an event-by-event basis.
StreamInsight natively supports Cross Join, Inner Join, and Anti Join. We will take some time to discuss how we can realize other join types from these three primitives.
A theta join derives from inner join, where the join condition is something other than equality of values in the specific fields. Consider this query as an example.
[Theta Join] “Report the output whenever Toll Booth 2 has processed lesser number of vehicles than Toll Booth 1, computed over the last 1 minute, every time a change occurs in either stream”.
var query = from e1 in stream1 from e2 in stream2 where e1.VehicleCount > e2.VehicleCount select new TollCompare { TollId_1 = e1.TollId, TollId_2 = e2.TollId, VehicleCount = e1.VehicleCount };
Click here to view code as image
An equi join is a special case of the theta join – where the join condition is an equality of values in specific fields from the payload of both streams. This is the example we saw above in [Inner Join].
A self-join derives from inner-join – it is a join of the stream to itself. Combined with multicast, it can be used to correlate different fields in the payload (which by itself, one might argue, does not require a join) with different temporal constraints being imposed on each stream. A commonly recurring pattern in streaming queries is to replicate a stream using a multicast, having one of the replicated streams compute an aggregate or perform another complex operation, and then use the other replica, typically with temporal transformations, and to combine the two results to achieve the desired application semantics.
A common need in streaming applications is the ability to detect non-occurrences of events. Relevant to our toll station scenario, note that several toll stations offer automated tollbooths. In the US, there is a network of highways that provide this service called EZPass. Commuters who subscribe to this service can drive through the automated tollbooths with lesser delay. A scanner in the automated tollbooth reads the tag whenever the vehicle passes through one of the toll booths. Given this background, a major pain point for EZPass is lost revenue due to toll violations on two accounts: (1) toll evasion – where the vehicle that passes through the toll does not have a tag (2) speed – where the vehicle passes too fast through the toll. We will look at the toll evasion case here.
In general terms, we detect non-occurrence in the following manner:
1. Define a reference stream of events, each characterizing the occurrence of something of interest.
2. Compare the stream that is being observed against this reference stream on a continuous basis.
3. Only when the observed stream has an absence (i.e., non-occurrence) of the reference event, you report this non-occurrence from the reference stream.
Figure 17. Left Anti Join in action
Consider the query:
[Left Anti Join] “Report toll violators – owners of vehicles that pass through an automated toll booth without a valid EZ-Pass tag read”.
Let's walk through the query building process.
Step 1 Model input and output events from the applications perspective.
One of these will be the reference stream – it indicates the physical passage of a vehicle through the tollbooth – where an event will be generated whenever a vehicle crosses the toll booth. The other will be the observed stream – it represents a valid tag read – where an event will be generated whenever a valid tag read occurs when the vehicle crosses the tollbooth.
The Start time of the point event in the reference stream will be the point in time when the pressure sensor underneath the asphalt at the entry to the toll booth senses a vehicle. For the sake of simplicity, let's assume that the hardware in the toll station is fast enough to read the tag on the windshield at the exact point in time when the vehicle itself was sensed. So the start time of the point event in the observed stream will be the same, provided it was a valid read – if the tag was missing, expired, or is invalid, there would be no event generated.
Step 2 Understand the desired query semantics by constructing sample input and output CHTs.
The time plot in Figure 17 shows a reference stream with events named eRi and indicates the passage of a vehicle through the toll booth 1. The middle track shows an observed stream with events named eOi and indicates valid tag reads in toll booth 1. The bottom-most track on the time plot shows the output events, each of these events indicates a tag violation.
Step 3 Gather elements of query logic to compute the output and develop an event flow graph.
• The reference stream is the left branch of any join that we propose to use here, the observed stream is the right stream.
• We need a join type that outputs an event from the reference stream whenever an event in the left (reference) stream of the join does NOT find a matching event in the right (observed) stream. The match is defined as the points in time:
1. when an event does not exist on the observed stream;
2. when the event’s payload does not satisfy the join predicate.
This example demonstrates case 1 above. The lifetime of this output event will be the duration over which the two events do NOT overlap – which implies, again, a point event. [Left Anti Join] demonstrates this behavior.
Step 4 Compose the query as a streaming transformation of input to output.
// Simulate the reference stream from inputStream itself - convert it to a point event stream var referenceStream = from e in inputStream.ToPointEventStream() select e; // Simulate the tag violations in the observed stream by filtering out specific // vehicles. Let us filter out all the events in the dataset with a Tag length of 0. // In a real scenario, these events will not exist at all – this simulation is only // because we are reusing the same input stream for this example. // The events that were filtered out should be the ones that show up in the output of Q7 var observedStream = from e in inputStream.ToPointEventStream() where 0 != e.Tag.Length select e; // Report tag violations var query = referenceStream.LeftAntiJoin(observedStream, (left, right) => true);
Click here to view code as image
Note that there is no requirement that the reference stream and the observed stream consist of events with the same duration for Left Anti Join to work. For cases where these events have different lifetimes, the output from Left Anti Join will have a lifetime that corresponds to the non-overlapping areas of the two lifetimes. It is, however, important that you make sure that the output behavior correctly reflects your application semantics – here is where the above process will be immensely helpful.
Note that there was no payload considered in the determination of the join – this is another axis on which you could pivot the join behavior. Also note that we did not specify any window of time over which to make this correlation. You can easily compose another query based on [Left Anti Join] to “find out the number of violations over the past 1 hour” using the count and windowing constructs we discussed earlier.
Left Anti Join is the third join type natively supported in StreamInsight.