2. Tutorial

We will start this section with a tutorial example of an application that shows the basic scaffolding required around a StreamInsight query. Keeping this application relatively constant, we will progress through query capabilities – from simple to the more complex ones. To get started, install/enable .NET 4.0 Full Edition, StreamInsight, and Visual Studio 2010 on your Windows 7 machine. Download the code from the same location as this paper, and load up HitchHiker.sln in Visual Studio. Besides the code, keep the documentation on StreamInsight, C#, and LINQ within reach. Keep a checkered notepad and a pencil handy to draw time plots.

2.1 “Hello, Toll!” – Programming a StreamInsight application

image

A tolling station is a common phenomenon – we encounter them in many expressways, bridges, and tunnels across the world. Each toll station has multiple toll booths, which may be manual – meaning that you stop to pay the toll to an attendant, or automated – where a sensor placed on top of the booth scans an RFID card affixed to the windshield of your vehicle as you pass the toll booth. It is easy to visualize the passage of vehicles through these toll stations as an event stream over which interesting operations can be performed.

In HelloToll.cs, we show an application built around a set of demo queries which we will go over in more details later in this paper. The “Program plumbing” region demonstrates basic steps needed to run a query in StreamInsight. We will walk through this example step by step. Some code regions that are not immediately relevant to understand how a query fits into an application are collapsed for brevity and appear as . . . in the example below.

. . .
static void TumblingCount(Application app)
{
    var inputStream = app.GetTollReadings();
    var query = from win in inputStream.TumblingWindow(TimeSpan.FromMinutes(3))

                select win.Count();image
    app.DisplayIntervalResults(query);
}
...

#region Program plumbing
...
static IQStreamable<TollReading> GetTollReadings(this Application app)
{
    return app.DefineEnumerable(() =>
        // Simulated readings data defined as an array.
        // IntervalEvent objects are constructed directly to avoid copying.
        new[]
        {
            IntervalEvent.CreateInsert(
                new DateTime(2009, 06, 25, 12, 01, 0),
                new DateTime(2009, 06, 25, 12, 03, 0),

                new TollReadingimage
                {
                    TollId = "1",
                    LicensePlate = "JNB 7001",
                    State = "NY",
                    Make = "Honda",
                    Model = "CRV",
                    VehicleType = 1,
                    VehicleWeight = 0,
                    Toll = 7.0f,
                    Tag = ""
                }),
            ...
        })
        // Predefined AdvanceTimeSettings.IncreasingStartTime is used
        // to insert CTIs after each event, which occurs later than the previous one.

        .ToIntervalStreamable(e => e, AdvanceTimeSettings.IncreasingStartTime);image
    }

static void DisplayPointResults<TPayload>(
    this Application app,
    IQStreamable<TPayload> resultStream)
{
    // Define observer that formats arriving events as points to the console window.
    var consoleObserver = app.DefineObserver(() =>
        Observer.Create<PointEvent<TPayload>>(ConsoleWritePoint));

    // Bind resultStream stream to consoleObserver.
    
    var binding = resultStream.Bind(consoleObserver);image

    // Run example query by creating a process from the binding we’ve built above.

    using (binding.Run("ExampleProcess"))image
    {
        ...
    }
}

Click here to view code as image

We implement a running StreamInsight query essentially in five logical steps. For now, please focus on the code fragments – we will explain the rationale behind the various choices made in this application as a next step.

image Each event in StreamInsight consists of two components – the event payload, which reflects the data values carried by the event, and the event shapei.e., the temporal nature of the event as it models an actual occurrence. The event shape defines the lifetime of the event – i.e., the duration for which the values in the payload last along the forward progression of time. In this example, TollReading is a C# class that represents the payload for events input into the query. The query output is of primitive event type long.

image Define the input stream of events as a function of event payload and shape. We defined the payload component in the above step. Next, we model the act of a vehicle crossing the toll booth as an interval of time (via combination of ToIntervalStreamable function and IntervalEvent type). We complete the input stream definition by wrapping sample data array into enumerable sequence using DefineEnumerable function. StreamInsight allows the query engine to interface with practically any input or output device using IEnumerable, IObservable, or StreamInsight specific units called adapters.

image Define the query logic itself, which is expressed as a LINQ statement that declaratively describes the processing on events flowing into the engine through inputStream. The query produces an output stream of events with payload of type long. We will discuss the query logic in the next section. Learning how to write such queries is the central focus of this paper. For now, the key takeaway is that query logic allows you to define the event processing intent purely based on your knowledge of input and output event payload and shape definitions – isolated from the physical aspects of event delivery and output.

image “Bind” the query logic to the consumer. The consumer in our example just outputs formatted events to the console using the simple observer consoleObserver defined using DefineObserver method. The query logic result is then bound to it using the Bind method. The output events are interpreted as point events due to usage of PointEvent type in the DefineObserver and ConsoleWritePoint methods. The ConsoleWritePoint method provides actual implementation for outputting the events to the console. All that is remaining is to create a process.

image Running the query. In order to start evaluation of the query logic we need to create a process by calling the Run method on the binding. From this point on, unless it is stopped by a subsequent call to Dispose on this process, or abnormally terminated by some runtime condition, this standing query will continue to run in perpetuity.

This programming model enables a flexible and rapid Dev-Deploy cycle. First, the concept of a query template allows you to declaratively express your business logic, isolated from concerns of event delivery and output. The combination of LINQ with C# allows you to code both the declarative and procedural aspects in a single programmatic environment. The Visual Studio IDE gives you the convenience of IntelliSense for code completion, code analysis, refactoring and other features for rapid development. All the steps leading up to query start are resolved at compile time, minimizing the number of surprises to deal with at runtime.

While this simple example shows you anonymous entities, the full Object Model API allows you to register each discrete entity that is part of a process in the StreamInsight metadata. Given a shared knowledge of event payload and shape, query and source/sink developers can work independently to build and ship their modules as .NET assemblies. The product is based on a .NET platform, so deployment simply entails locating these assemblies with the StreamInsight libraries. An application integrator can write a wrapper program like HelloToll.cs that connects to a server, binds a given query logic with sources and sinks, and creates a process.

2.2 Query Development Process

In this section, we present a five step process to be used as a general guideline to implement your StreamInsight queries. The key goals of this process are to help you develop the right mindset when approaching a real-world problem, and to illustrate the use of StreamInsight query features in arriving at the solution. The steps are:

Step 1 – Model the payload and shape of input and output events from the application’s perspective
Step 2 – Understand the required query semantics by building sample input and output event tables
Step 3 – Gather elements of the query logic to develop an event flow graph and compute the output
Step 4 – Compose the query as a streaming transformation of the input to output
Step 5 – Specify the timeliness of query output in consideration of its correctness

We will repeat this process for each query we develop in this example.

Let's try to give a definition for the first query in the HelloToll.cs example:

[Tumbling Count] Every 3 minutes, count the number of vehicles processed at the toll station since the last result. Report the result at a point in time, at the end of the 3 minute window.

2.2.1 Step 1 – Model input and output events from the application’s perspective

StreamInsight allows you to model the event shape to reflect happenings in real-life – as required by the application. As we discussed in image, we model the input event as an interval of time based on the assumption that many toll stations are manned, and we want to account for the few seconds to a minute1 of interaction between an attendant and a motorist. The StartTime and EndTime of each event mark a vehicle’s arrival at, and departure from, a toll booth respectively.

The output events containing the counts are modeled as Point events, reflecting the fact that we would know the count value only at the end of each 3 minute interval.

2.2.2 Step 2 – Understand required query semantics by building sample input and output event tables

StreamInsight queries function based on a strong foundational query algebra that treats every event as an interval event, irrespective of the shape defined at the programmatic level. In this step, we tabulate a representative sample of input events as the query engine would see it in order to understand the query semantics.

A canonical history table (or CHT) is a simple tabulation of a set of sample events, with each event showing the StartTime and the EndTime of the event, along with the relevant payload fields. Here, given that the output is just a simple count of events (rather than some aggregate computation based on payload values), we skip showing the payload values. The variety of input interval durations helps in building an understanding of the query’s semantics. To further simplify our example we will consider only events generated by toll booth number 1.

Table 1. Input events canonical history table for example [Tumbling Count] for toll booth 1.

Event

StartTime

EndTime

Payload TollId

Payload VehicleId

e1

12:01

12:03

 

 

e2

12:02

12:03

 

 

e3

12:03

12:08

 

 

e4

12:07

12:10

 

 

e5

12:10

12:14

 

 

e6

12:11

12:13

 

 

e7

12:20

12:22

 

 

e8

12:22

12:25

 

 

Click here to view table as image

It is useful to graph CHTs on a time plot. A time plot is a simple X/Y plot where the X axis marks the progression of time, and Y is simply a space to place the events as they arrive. The markings on the X axis denote minutes past 12:00, e.g., X=4 corresponds to 12:04.

image

Figure 1. Time plot for input events, a tumbling window of size 3, and result counting vehicles being processed

With the input events mapped on the time plot, we will build an output CHT that reflects the query requirements stated in [Tumbling Count] above. The problem requires a count to be reported once every three minutes. We therefore partition time into 3 minute intervals as shown on the time plot above. The green lines depict a tumbling window of size 3 minutes. Now, the problem statement is a bit ambiguous. Are we counting the number of vehicles which arrived during this period? Or perhaps the number of vehicles which were being processed at some point during that period? Or is it the number of vehicles which completed processing during that period? Such subtle ambiguities will be brought to light when we construct example output CHTs.

All intervals and windows in StreamInsight are closed at the left, open on the right – meaning that an event’s value at the start time of the interval or window is included in every computation, but the value at the end time is excluded. Since the result of a computation over a window can be known only at the end of the window interval we want the output as of a point in time, showing the events processed over the window interval. From the query engine’s perspective, a point event is nothing but an interval event with a lifetime of one chronon (one .NET tick, equivalent to 100 nanoseconds, which we will represent as ε). In other words, assume we had input events as of a point in time. Then we can represent them as interval events in the CHT with each event having an EndTime that is StartTime + ε. Results depicted on Figure 1 below the axis are points. We depict ε as a small line smaller than a unit of time represented on the time plot. The output events are annotated with a set of events over which this output is computed for easier cross referencing.

Let’s revisit the earlier problem definition to remove the ambiguity (note the emphasized part).

[Tumbling Count] Every 3 minutes, report the number of vehicles processed that were being processed at some point during that period at the toll station since the last result. Report the result at a point in time, at the end of the 3 minute window.

The comments in the table on the right hand side show the events that contributed to the result during the associated window of time.

Table 2. Output CHT for [Tumbling Count]

Event

StartTime

EndTime

Payload Count

 

Events

Comment

o1

12:03

12:03 +ε

2

 

e1,e2

e3 excluded because window interval is open at the right

o2

12:06

12:06 +ε

1

 

e3

e1, e2 are excluded, since event interval is open at the right

o3

12:09

12:09 +ε

2

 

e3, e4

 

o4

12:12

12:12 +ε

3

 

e4, e5, e6

 

o5

12:15

12:15 +ε

2

 

e5, e6

 

 

12:18

12:18+ε

 

 

 

Note that no event is produced for this window.

o6

12:21

12:21+ε

1

 

e7

 

o7

12:24

12:24+ε

2

 

e7, e8

 

o8

12:27

12:27+ε

1

 

e8

 

Click here to view table as image

To provide some perspective, if we had decided to output the number of vehicles which arrived during a given 3 minute period, output events would have the following count values, taking only the event start times into account. The query required to generate this output will look very different.

image

Figure 2. Time plot for input, a tumbling window of size 3, and result counting arrival of the vehicles

Note that even though e3’s StartTime aligns with the window’s end time, since the window interval is open-ended, e3 cannot be added to this window count. Also, result o3 does not include e3, even though e3 overlaps with this window, because this window doesn’t contain e3’s StartTime.

If our interest was in the count of events that completed processing during a window, the output CHT would have the following counts.

Again, the query that needs to be composed to get this output would be very different. In fact let's consider two variations here.

image

Figure 3. Time plot for input, a tumbling window of size 3, and two variations of results counting completion of the vehicle processing

If we are to consider the EndTime of each event as a point and count these points, we end up with the result shown right below the input events on the time plot, which may seem odd for the problem definition. The vehicle processing represented by e1 and e2 are accounted for only at time 12:06. The more plausible results can be obtained if EndTimes were to be shifted back by ε. The latter result is shown on the bottom-most track on the time plot. Notice that the only difference is in the very first result o1. Let’s trace what happens to e1 and e2 here: If we consider the EndTime as a point representing the event, then we consider the StartTime of these points to be 12:03. As we have discussed, these will not be accounted for in the first window, but rather they will be picked up in the second window causing the result to appear at 12:06. On the other hand, if the EndTime is pushed back by ε, these events would be accounted for in the first window. Shifting event time by ε only affects computation for events that align to the window boundaries.

Now that we have decided how we want the output to look, the next step is …

2.2.3 Step 3 – Gather elements of query logic to compute the output and develop an event flow graph

There are three main elements for this query:

image

Figure 4. Query Graph for the [Tumbling Count] query

Window – Windows are the mechanism by which events are grouped together into sets. In this case, we chose a 3 minute tumbling window, which means that our windows consist of consecutive non-overlapping 3 minute periods of time, as described above.

Count() – Computation done within a window – we count the number of events in each window using this built-in aggregate.

Project – We project the results out to the output stream, in this case, without any further transformations to the value.

The query graph is a simple sketch of the event flow through these operators. Chart the query graph from top to bottom to align yourself with the query graphs displayed by the Event Flow Debugger.

2.2.4 Step 4 – Compose the query as a streaming transformation of input into the output

Here is how the query reads: For each tumbling window win of size 3 minutes defined on inputStream, compute the count of events and project it out as an event with payload of type long.

var query = from win in inputStream.TumblingWindow(TimeSpan.FromMinutes(3))
            select win.Count();

Click here to view code as image

2.2.5 Step 5 – Consider the timeliness balanced against correctness of query output

This is a critical step which requires specification on the output delivery based on an understanding of the behavior of the input event stream(s). We will discuss this step in detail under the section “13. Step 5 – Time and Commitment” later in the document. For now, please overlook the event entries marked “CTI” in the output display in the examples below.

2.3 Understanding the Query Output

The input events streamed into the HelloToll query are shown in Figure 5. Each line has StartTime and EndTime values, followed by the values for the six payload fields. In the example program, input data is emulated as an array. The output events generated from an actual run are shown in Figure 6. On the output, each INSERT signifies an insertion of an event from the query into the output stream. Let's analyze the output: The first INSERT event signifies the output for the first tumbling window, reported with the EndTime of the window (12:03), and reflects a count value of 3, computed based on the inputs:

INSERT

6/25/2009 12:01:00

06/25/2009 12:03:00

1

JNB 7001

NY

Honda

CRV

1

0

7.0

 

INSERT

6/25/2009 12:02:00

06/25/2009 12:03:00

1

YXZ 1001

NY

Toyota

Camry

1

0

4.0

123456789

INSERT

6/25/2009 12:02:00

06/25/2009 12:04:00

3

ABC 1004

CT

Ford

Taurus

1

0

5.0

456789123

Click here to view table as image

The second INSERT event with a count of 4 is the output for the second tumbling window, and includes the following events:

INSERT

6/25/2009 12:02:00

06/25/2009 12:04:00

3

ABC 1004

CT

Ford

Taurus

1

0

5.0

456789123

INSERT

6/25/2009 12:03:00

06/25/2009 12:07:00

2

XYZ 1003

CT

Toyota

Corolla

1

0

4.0

 

INSERT

6/25/2009 12:03:00

06/25/2009 12:08:00

1

BNJ 1007

NY

Honda

CRV

1

0

5.0

789123456

INSERT

6/25/2009 12:05:00

06/25/2009 12:07:00

2

CDE 1007

NJ

Toyota

4x4

1

0

6.0

321987654

Click here to view table as image

image

Figure 5. Input events in HelloToll.cs

image

Figure 6. Output events from [Tumbling Count] in HelloToll.cs

image

Summary: In this section, we covered the following concepts:

1. The anatomy of a StreamInsight application and simple query.

 

2. A five step process for query development.

 

3. The use of canonical history tables to visualize the input and output, clarify a problem statement, and write the correct query.

 

4. An application can model an event as an interval or point event.

 

5. The query engine internally treats every event as an interval event with a start and end time.

 

6. Understanding the output of a simple query.