Managing device alarms in Cumulocity IoT in a functional style

Apama Streaming Analytics version 10.15.3 and above. Cumulocity IoT Streaming Analytics version 10.18 and above.

The task

The task we’re going to tackle in this post is, given a list of devices in the form of an EPL sequence<ManagedObject>, to check for any active alarms for each of the devices and then clear all of those alarms. Finally, we want to know how many alarms need clearing and when all of those alarms have been cleared. Using just regular EPL constructs, you would need to:

  • Create some sequences to hold in-flight pending events:

      sequence<integer> outstandingIds := new sequence<integer>;
      sequence<Alarm> allAlarms := new sequence<Alarm>;
    
  • Iterate over the sequence of managed objects:

      ManagedObject o;
      for o in objects {
    
  • Send a bunch of FindAlarm requests:

      integer reqId := Util.generateReqId();
      send FindAlarm(reqId, { "source" : o.id }) to FindAlarm.SEND_CHANNEL;
      outstandingIds.append(reqId);
    
  • Listen for all the FindAlarmResponse events and count the events to be cleared:

      monitor.subscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL);
    
      on all FindAlarmResponse(reqId=reqId) as resp and not FindAlarmResponseAck(reqId=reqId) {
          allAlarms.append(resp.alarm);
      }
    
  • Listen for all of the FindAlarmResponseAck events and record that we’ve received them:

      on FindAlarmResponseAck(reqId=reqId) {
          outstandingIds.remove(outstandingIds.indexOf(reqId));
    
  • Once all of them have been received, log that they’re all cleared:

      if outstandingIds.size() = 0 then {
           monitor.unsubscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL));
           Alarm a;
           for a in allAlarms {
               a.status := "CLEARED";
               send a to Alarm.SEND_CHANNEL;
           }
           log "Cleared "+allAlarms.size().toString()+" alarms from "+objects.size().toString()+" devices" at INFO;
      }
    

Functional programming

The EPL Functional Library provides various functional operations, such as filter, map and reduce, which operate on EPL container types. There are also several functor actions and predicates which can help you use these operators. There are two APIs for accessing the functional operators: com.apama.functional.Fn and com.apama.functional.Functional.

Firstly, you can use the Fn type, which provides static functions which operate on containers, for example:

sequence<integer> evens := <sequence<integer>> Fn.filter(numbers, Fn.even);

Secondly, you can use the Functional type, which wraps your container and provides the functional operators as instance methods, each one returning a new Functional wrapping the result container. This allows the chaining of multiple operators in a fluent style. For example, this code wraps the original sequence of numbers, filters out just the even numbers, and then adds them up, yielding a single integer as the result:

integer evenSum := <integer> Functional(numbers).filter(Fn.even).reduce(Fn.sum);

Besides operations on static containers, EPL is a language designed to handle streams of events. There are also functional-style ways of doing that. For example, if you have a variable number of outstanding operations and you need to wait for a completed event from each one before continuing you could use:

Functional(sequenceIDs).waitForAllCompleted( "Completed", "id", onCompleted).onTimeout(TIMEOUTSECS, onTimeout);

Partially binding functions

Functional programming, and APIs that rely on callbacks, often need to provide additional arguments to functions that will be invoked by the APIs. The EPL Functional Library allows you to partially bind arguments to actions and provide those as an argument to the API. This is similar to how a lambda would be used in other languages. For example:

Fn.map(["Bob", "Alice"], Fn.partial(Fn.concat, "Hello ")) // returns ["Hello Bob", "Hello Alice"]

Managing alarms functionally

We’re going to use all of the features above to implement the alarm clearing in a functional fashion. To do this we will write a single Functional chain, using a few simple helper actions. The main functional component looks like this:

monitor.subscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL);

sequence<Alarm> allAlarms := new sequence<Alarm>;
any _ := Functional(objects)
   .map(Fn.getEntry("id")) // get the id from each object.
   .map(sendFindAlarm) // send a find alarm for each, returning the ids.
   .map(Fn.partial(receiveAlarms, [allAlarms])) // listen for all those alarms.
   .waitForAllCompleted(FindAlarmResponseAck.getName(), "reqId", Fn.partial(clearAllAlarms, [allAlarms]));  // clear all the alarms we found.

This can be read as follows:

  • Take my list of managed objects and get the id from each one.

  • For each id, call the sendFindAlarm functor, which will send a request and return the request ids.

  • For each request id, call receiveAlarms, which will listen for all of the alarms from that object and build the allAlarms sequence.

  • Finally wait for all of the ids to be complete and call clearAllAlarms with the built up sequence of alarms.

There’s logic in the helper actions, but the functional style allows you to see the overall logic in a compact fashion together and then drill down into the details when you need it.

The sendFindAlarm action is very simple and self-explanatory:

action sendFindAlarm(string source) returns integer
{
    return <integer> Fn.sendToChannel(FindAlarm.SEND_CHANNEL,
            FindAlarm(Util.generateReqId(), {"source": source.toString() }))
            .getEntry("reqId");
}

The receiveAlarms function uses another part of the Functional API:

action receiveAlarms(sequence<Alarm> allAlarms, integer reqId) returns integer
{
     any _ := Fn.getAllEvents(FindAlarmResponse.getName(), {"reqId":<any>reqId},
                     FindAlarmResponseAck.getName(), {"reqId":<any>reqId},
                     Fn.partial(appendSequence, [allAlarms]));
     return reqId;
}

Here we’re asking Functional to collect together all of the responses received before the acknowledgement.

Finally, clearAllAlarms is also implemented as a functional call:

action clearAllAlarms(sequence<Alarm> allAlarms)
{
   monitor.unsubscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL);
   any _ := Functional(allAlarms)
                    .map(Fn.setEntry("status", "CLEARED"))
                    .map(Fn.partial(Fn.sendToChannel, Alarm.SEND_CHANNEL));
}

Further reading

You can see the full sample in the latest release of Apama in the samples along with the full documentation of using functional operations in EPL and the API doc.

Read full topic