Simple aggregates such as Sum, Count, and Avg are built into StreamInsight extensions for LINQ. If you have the need the compute more custom aggregates and functions, you have four choices:
• User Defined Functions – useful for per-event computations. The function is executed for each incoming event and an expression is returned. It can be used wherever expressions can be used.
• User Defined Aggregate – useful for computing aggregates over a set of events. It returns a scalar value from a computation based on a set of events. Typically, the events that fall within a window of time constitute the set of events over which you are aggregating.
• User Defined Operator – useful for defining complete event transformers. A UDO processes a set of events and returns another set of events. In most general terms, this feature can be used to build custom operators that perform unique operations not covered by the existing set of operators such as join, union, group and apply, project, and so on. The more common use case is to build time sensitive and time insensitive operations against a set of events.
Let's explore these capabilities with toll station based examples.
There are roughly 19 million EZ-Pass tags in circulation in the US (source). This surprisingly low number indicates that this technology has not yet captured the user’s imagination as something that could be a more useful commuting tool than a mere payment device. Depending on how the backend data store is designed, you can obtain different tag information (expired, reported-stolen, other) from the account database in anywhere from a few seconds to a minute.
The opportunities for immediate action – for customer service (email or mail notification, traffic updates, auto service reminders and promos “time for oil change, get 10% off at Firestone”), or prevention of misuse or crime (flagging authorities) – are limitless. What takes several days using traditional batch processing can be shortened down to seconds or minutes after the occurrence of the event of a vehicle crossing a toll station. Consider another variant of the toll violation query.
[UDF] For each vehicle that is being processed at an EZ-Pass booth, report the toll reading if the tag does not exist, has expired, or is reported stolen.
The query for the above problem statement will look like this:
var query = from e in inputStream where 0 == e.Tag.Length || TagInfo.IsLostOrStolen(e.Tag) || TagInfo.IsExpired(e.Tag) select new TollViolation { LicensePlate = e.LicensePlate, Make = e.Make, Model = e.Model, State = e.State, Tag = e.Tag, TollId = e.TollId };
Click here to view code as image
The two user defined functions IsExpired
and IsLostOrStolen
are defined below.
// Assume this is the reference database; the user defined function will search against this. public static TagInfo[] tags = new TagInfo[] { new TagInfo { TagId = "123456789", RenewalDate = ParseDate("2/20/2009"), IsReportedLostOrStolen = false, AccountId = "NJ100001JET1109" }, new TagInfo { TagId = "234567891", RenewalDate = ParseDate("12/6/2008"), IsReportedLostOrStolen=true, AccountId="NY100002GNT0109" }, new TagInfo { TagId = "345678912", RenewalDate = ParseDate("9/1/2008"), IsReportedLostOrStolen = true, AccountId = "CT100003YNK0210" } }; public static bool IsLostOrStolen(string tagId) { return Tags.Any(tag => tag.TagId == tagId && tag.IsReportedLostOrStolen); } public static bool IsExpired(string TagId) { return Tags.Any(tag => tag.TagId == tagId && tag.RenewalDate.AddYears(1) > DateTime.Now); }
Click here to view code as image
Note that these routines can be of arbitrary complexity. The motivation behind the simple array of TagInfo objects in this example is to demonstrate something self-contained within this program. In a production system, this will most likely be replaced by LINQ to SQL query against a database, or against an in-memory cache like Windows AppFabric.
User defined functions are generally used in the filtering clause of the LINQ query, but they can be applied wherever an expression is returned, so they can be used in Project operations. For example, you can imagine a function that is used to retrieve the VIN (vehicle identification number) of a vehicle given its license plates – a sample query might be as shown below:
var query = from e in inputStream where 0 == e.Tag.Length || TagInfo.IsLostOrStolen(e.Tag) || TagInfo.IsExpired(e.Tag) select new TollViolation { LicensePlate = e.LicensePlate, Make = e.Make, Model = e.Model, State = e.State, Tag = e.Tag, TollId = e.TollId Vin = GetVinFromLicensePlate(e.LicensePlate) // UDF in a Project operation };
Click here to view code as image
GetVinFromLicensePlate
is a UDF that does referential access against some external store. In this place, any other complex operation that returns an expression will help.
Out of the box, StreamInsight supports Min, Max, Avg, Sum, and Count. But there are several other custom aggregates that you may be interested in computing against an event stream. StreamInsight provides the capability to develop such aggregates via the extensibility mechanism of user defined aggregates.
In the eastern part of the US, Eastern New York, New Jersey, and Connecticut form what is called the Tri-State area, and southern and central NJ has a similar affinity with the state of PA. Given the population and geographic density, it is common for people to live in one state and work in another. Assume that for a given toll station, we want to compute the ratio of out-of-state vehicles to in-state vehicles.
Note that this example can be solved using a single query with simple counts of in-state, out-of-state, and total number of vehicles, and simply dividing them to obtain the result – it does not require the heavy machinery of a UDA. But the goal in this section is to show the different pieces of crafting a UDA, so we will consider this simple application logic.
[UDA] Over a 3 minute tumbling window, find the ratio of out-of-state vehicles to in-state vehicles being processed at a toll station.
The query for this problem statement will look like this:
var query = from win in inputStream.TumblingWindow(TimeSpan.FromMinutes(3)) select win.UserDefinedAggregate<TollReading, OutOfStateVehicleRatio, float>(null);
Click here to view code as image
The OutOfStateVehicleRatio
is a User Defined Aggregate that is externalized to the LINQ surface through the UserDefinedAggregate
operator extension. The actual class that contains the UDA implementation looks as follows:
public class OutOfStateVehicleRatio : CepAggregate<TollReading, float> { public override float GenerateOutput(IEnumerable<TollReading> tollReadings) { float tempCount = 0; float totalCount = 0; foreach (var tollReading in tollReadings) { totalCount++; if (tollReading.State != "NY") { tempCount++; } } return tempCount / totalCount; } }
Click here to view code as image
Some notes about the implementation of OutOfStateVehicleRatio
class:
• The input into this aggregate is a set of events with payload of type TollReading. The output of this aggregate is a value of type float (by virtue of this class inheriting from CepAggregate<TollReading, float>
and the IEnumerable<TollReading>
).
• Alternatively, since we know that the out-of-state is determined based on the payload field State of type String, we can define a more restrictive class as CepAggregate<string, float>
as shown below (changes from the previous class is highlighted in bold)
public class OutOfStateVehicleRatio2 : CepAggregate<string, float> { public override float GenerateOutput(IEnumerable<string> stateReadings) { float tempCount = 0; float totalCount = 0; foreach (var state in stateReadings) { totalCount++; if (state != "NY") { tempCount++; } } return tempCount / totalCount; } }
Click here to view code as image
Another operator extension, UserDefinedAggregateWithMapping
, enables an additional formal parameter of an Expression that maps to the type of a single field of the payload (this mapping could simply be a reference to a specific field in the payload itself). This will enable us to invoke the UDA with a lambda expression that maps the event payload to a string – in this case, by simply referencing the State field in the payload. This causes an IEnumerable collection of just the State values to be sent into the UDA instead of the entire event payload.
var query = from win in inputStream.TumblingWindow(TimeSpan.FromMinutes(3)) select win.UserDefinedAggregateWithMapping<TollReading, OutOfStateVehicleRatio2, string, float> (e => e.State, null)
Click here to view code as image
UDAs are not incremental. For every aggregate computation for a given event input, the full set of events that “belong” to the window over which the aggregate is computed is considered. However, this provides an acceptable performance for most applications.
User Defined Operators are useful for processing a set of events and returning another set of events. This can be used to build custom operators that perform unique operations not covered by the existing set of operators such as join, union, multicast, and so on. UDOs are also useful when you need to deal with the complete event structure, i.e., inclusive of the Start and End Time fields, rather than dealing just with payload fields.
Consider a hypothetical example where the toll station also acts as a weigh station for trucks and other commercial vehicles. If the vehicle is hauling a load that is not in line with its “class” – in terms of number of axles and such parameters - weigh stations charge an extra toll for the additional tonnage. Assume that for each truck or commercial vehicle that passes through the EZ-Pass toll booth over a particular time period, the toll station has to compute this score and output an event that has a different structure – with the vehicle’s tonnage, license plate, weight compliance score, and other information.
[UDO] Over a one hour tumbling window, report all commercial vehicles with tonnage greater than one ton (2K lbs), along with their arrival times at the toll, and any charges due to weight violation. Overweight charges during the rush hour (7am to 7pm) are double that of non-rush hours.
Step 1 Model input and output events from the application’s perspective.
We will use the same input events as discussed in previous examples, augmented with weight-related information for this example. The output event will be structurally different than the input event.
Step 2 Understand the desired query semantics by constructing sample input and output event CHTs.
This is similar to the tumbling window example we noticed earlier. The emphasis in this example is on the output of an event with different payload type with different start and end timestamps.
Step 3 Consider the different elements of query logic required to compute the output above.
While the windowing constructs are simple, the output event structure is dependent on the timestamp of the event. This requires a construct called time-sensitive User Defined Operator.
Step 4 Compose the query as a streaming transformation of input to output.
var query = from win in inputStream.TumblingWindow(TimeSpan.FromHours(1)) from e in win.UserDefinedOperator(() => new VehicleWeights()); select e;
Click here to view code as image
The UDO VehicleWeights() is defined as follows. In this example, we have made the weight charge a constant value – it is possible to pass this into the UDO implementation as a configuration parameter (in case the weight charge varies every month or every week depending on some other external factors). This is only to keep the example simpler to understand.
public class VehicleWeights : CepTimeSensitiveOperator<TollReading, VehicleWeightInfo> { double weightcharge = 0.5; public override IEnumerable<IntervalEvent<VehicleWeightInfo>> GenerateOutput(IEnumerable<IntervalEvent<TollReading>> events, WindowDescriptor windowDescriptor) { List<IntervalEvent<VehicleWeightInfo>> output = new List<IntervalEvent<VehicleWeightInfo>>(); // Identify any commercial vehicles in this window for the given window duration foreach (var e in events.Where(e => e.StartTime.Hour >= 0 && e.Payload.VehicleType == 2)) { // create an output interval event IntervalEvent<VehicleWeightInfo> vehicleWeightEvent = CreateIntervalEvent(); // populate the output interval event vehicleWeightEvent.StartTime = e.StartTime; vehicleWeightEvent.EndTime = e.EndTime; vehicleWeightEvent.Payload = new VehicleWeightInfo { LicensePlate = e.Payload.LicensePlate, Weight = e.Payload.VehicleWeight, // here is the interesting part; note how the output is dependent on // the start and end timestamps of the input event. The weight charge // is a function of the rush hour definition, the weigh charge factor // and the vehicle tonnage itself WeightCharge = ((e.StartTime.Hour >= 7 && e.StartTime.Hour <= 14) ? 2 : 1) * weightcharge * e.Payload.VehicleWeight }; // output the event via the IEnumerable interface output.Add(vehicleWeightEvent); } return output; } }
Click here to view code as image
Some important points to consider from both the User Defined Aggregate and UDO discussions above:
• UDAs and UDOs can be defined to be either time sensitive, or time insensitive. The UDA example is time insensitive, and the UDO is time sensitive.
• The main difference between the two is that time sensitive UDAs or UDOs expose the complete event to the UDO or UDA implementation – meaning that you can access the StartTime and EndTime of the event that “belongs” or “falls into” a window of time, and you can define your output event to be dependent on these timestamps. In other words, the event timestamps and payload fields of the output event can be based on these input event timestamps. In addition, the time sensitive UDA or UDO can be designed to provide the window’s start and end time values.
In contrast, a time insensitive UDO or UDA exposes only the payload to the UDO or UDA implementation. The output event from these extensibility mechanisms can be dependent only on the payload field values of the incoming event.
• For a time sensitive UDO or UDA, an important caveat is that ALL values in the implementation of the UDO or UDA MUST be deterministic. In other words, you cannot use constructs like Random()
, or DateTime.Now()
whose values can change from one invocation of the UDA or UDO to the next.
• UDO or UDA is invoked once per window, IF the events arrive into the UDO/UDA in an ordered fashion. We will defer the discussion on UDO behavior with out of order events until after the next section.