synxgen

AN EFFICIENT SCHEDULE AND NOTIFICATION SYSTEM FOR IOT DEVICES

Schedule and Notification System :

Internet of Things (IOT) is one of the greatest innovations in Technology . Some of its applications are Home Automation, Surveillance, wearable devices, smart cars, etc. Schedule systems for these IOT devices have become so beneficial. We designed Schedule system for arranging, controlling and optimizing work and workloads like automating the turning of bulbs on/off, periodic watering of plants using smart devices, etc. Notification systems enhance the delivering of notifications as per event to the appropriate entity. Notifications are a way to notify you as a form of popup or message in email & sms about any events like friendly reminders, device failures, schedules success ,etc.

Connectivity is everywhere.

Lets see a use case of a schedule system :

Suppose I have an AC at home and I want to turn it OFF at 1 am and make my fan ON at the same time everyday. So the schedule system will schedule the AC to be OFF and fan ON at 1 am. everyday. And the notification system will notify about the event status. This is a simple use case for schedule and notification system. 

We can also schedule watering devices at nursery to water plants everyday at 7 in the morning and this makes our job easy.

Let’s take a problem statement :

Here we have a bunch of devices like AC, bulbs, fans and heater.We want to design a schedule system for these devices and get notifications for every scheduled event .

How can we interact with the devices :

Here, we have devices like AC, bulbs, fans and heater which are all connected to a single Atom8 device, which controls the other connected devices asynchronously. You can check the Atom8 devices here.

The image shows how the devices interact with cloud services for control.
Way to control devices

MQTT Protocol :

MQTT stands for MQ Telemetry Transport. It is a machine to machine / ‘Internet of Things’ connectivity protocol. It is a lightweight, publish-subscribe message transport network protocol that transports messages between devices.

MQTT Broker :

It is a server through which all the messages are passed hence, it acts like a middle man which supports pub-sub messaging pattern and all the devices are connected to it. It uses the topic of message and decides which client and device are getting messages.

MQTT Topics :

Topics refers to strings that brokers use to filter messages for each connected client and device.

This is a simple explanation for MQTT
MQTT Broker and Topics

The above diagram is an example of how devices interact. Broker contains two MQTT topics Topic 1 & Topic 2. Here it uses a pub-sub model. Device A subscribes to Topic 1 and Device B subscribes to Topic 2. 

Now when the client publishes a message ‘ON’ to Topic 1, the broker will send this message to all devices and clients that have subscribed to Topic 1. In current case the Device A will recieve message to and will turn itself On.

For more details, refer : https://medium.com/@slysterous/a-detailed-guide-to-the-world-of-mqtt-8155f7475dd5

AWS IOT and MQTT :

AWS IOT is a platform by Amazon Web services to manage ‘Internet Of Things’ devices. These devices are also termed as ‘Things’. Like other services AWS IOT can connect to other AWS cloud services making its application much wider. 

Things transmit messages through device gateway, where they update the ‘Device Shadow’. A device shadow (or a thing shadow) is an object/document holding information of device in AWS IOT Device Shadow Service and updates to change the state of the device . The thing shadow eliminates the need to request information from the device to retrieve its state and reduces network traffic. In our case the Atom8 device is a thing. We have to set up the thing on AWS IOT first. 

For use case, refer : https://medium.com/tensoriot/aws-iot-creating-your-first-cloud-bound-device-d8dca0695f43

Sending Message to a device :

AWS IOT uses MQTT broker to transmit messages between Cloud and Devices. Suppose we want to make our AC ‘On’. Atom8 device handles AC. So now we have to update device shadow (state of the device) using Rest calls. A message publishes to the current Atom8 thing topic of AWS IOT MQTT broker when there is an update in device shadow. Now our Atom8 device have already subscribed to that topic so the broker publishes that message to this Atom8 device and as per the message it makes our AC ‘On’.

A sample flow chart to change the state of device :

The prcoces here shows end to end system to change the device state.
Process to change the device state

Structure of Device Shadow :

Device Shadows can make a device’s state available to apps and other services. Every shadow has a reserved MQTT topic and HTTP URL that supports the get, update, and delete actions on the shadow.Shadows use JSON shadow documents to store and retrieve data. A shadow’s document contains a state property that describes these aspects of the device’s state:

  1. desired
    – This  specifies the desired states of device properties by updating the desired object.
  2. reported
    – Devices report their current state in the reported object.
  3. delta
    – AWS IoT reports differences between the desired and the reported state in the delta object.

Example Shadow Document :

{
    "state": {
        "desired": {
            "field1": integer2,
            "field2": "string2"
        },
        "reported": {
            "field1": integer1,
            "field2": "string1"
        }
    },
    "version": version
}

For more details about device shadow you can refer this : https://docs.aws.amazon.com/iot/latest/developerguide/device-shadow-document.html

So now we are familiar with the process of changing the state of a device. Let’s start with a simple approach to Schedule and Notification system i.e. Version 1.

Schedule and Notification System V1 :

In this simple approach, we will design an end to end system to run schedules and send notifications to appropriate entities.

Here we use AWS cloudwatch, Lambda, IOT and Send Notification services.We write API codes are in AWS Lambda.

As we have already discussed in detail about how we can change the device state using device shadow, Mqtt and Atom8 devices. In this design we will focus on changing the device shadow state in AWS IOT for schedules.

The basic schedule and notification system for IOT devices using AWS.
Schedule and Notification System Version1

Process :

  • To run the schedules we want to call ExecuteScheulde API  at appropriate time everyday. So for that we will use Events rule in AWS Cloud Watch. Here we will define a rule to trigger a event at scheduled time and call ExecuteSchedule with appropriate data about the update to device details and required state changes.
  • Every Atom8 device can handle multiple devices like fan & AC and a home can have multiple Atom8 devices.
  • So every schedule can have multiple events of the same Atom8 devices or combination of multiple Atom8 devices.
  • For ‘n’ events ExecuteSchedule API will make ‘n’ asynchronous calls to UpdateThingShadow API. So AWS will create ‘n’ instances of UpdateThingShadow and these instances will update the desired state of the respective thing’s device shadow in AWS IOT.
  • This updates in device shadow will publish messages to respective MQTT topics and topics will publish messages to subscribed devices.
  • Hence in this way we can run multiple events in a single schedule.
  • After that the ExecuteSchedule API will call SendNotification API which will create a notification body and also will list endpoints of device’s owners.
  • After the completion of the above work, SendNotification API will push Notification messages to the concerned SNS endpoints which will push notifications to concerned phones.

In this way we can automate a schedule and send notification. Here we can also set events in cloudwatch using code made with aws-sdk.

Disadvantages of above system :

  • When a device changes its state as per desired state it reports back with a reported state to AWS IOT results in updating reported state of device shadow of particular thing. In the above system we are not considering the reported state. So if a device fails to report back due to any issue, still the schedule will be considered successful.
  • All MQTT calls are asynchronous and the messages sometimes are not in order, so handling and checking the reported state is difficult.
  • Here we don’t consider any type of failure in the schedule event. The reason we can update the desired state is whether the device is online or offline, but the device only reports back when it’s online. So not considering the reported state can neglect failures of events.
  • Further we cannot notify users about successful, partial or failed execution of schedule.

To overcome these disadvantages we can design an improved Schedule and Notification system.

Schedule and Notification System V2 :

Here the main point of concern was to consider the reported state of the device also. To do that our system must have to wait for the device to report back so that we can analyze the reported state and see whether the scheduled event was successful.

So here AWS StepFunctions becomes very useful. StepFunctions consists of workflow of states. Our system can be divided into different states and we can define a workflow to run those states in a required manner. In StepFunctions we will work on two most important features which are Dynamic Parallelism and waitForTaskToken.

Dynamic Parallelism in step functions :

In Dynamic Parallelism in step functions, we can run identical parallel tasks as per input to the state. Here the number of parallel tasks is decided as per the length of the input array. It runs on scatter and gather model. The ‘Map’ state can be used in step functions to run the same steps for multiple entries of the input array in parallel.

For examples, refer : https://docs.aws.amazon.com/step-functions/latest/dg/tutorial-creating-map-state-machine.html

waitForTaskToken’ task in step functions :

These types of tasks can be used when we need a callback from an external service. So here we can call an external service with a taskToken included in the input. Here the workflow will stop and wait for taskToken to be returned with ‘sendTaskSuccess’ after which the task will be considered finished and the workflow will resume. ’sendTaskSuccess’ along with taskToken can be sent using aws-sdk in any programming language.

Example of waitForTaskToken task :

"sample wait state": {
  "Type": "Task",
  "Resource": "arn:aws:states:::lambda::invoke.waitForTaskToken",
  "Parameters": {
    "Function": "lambdaFunctionName",
    "Payload": {
        "input": "inputString",
        "TaskToken.$": "$$.Task.Token"
     }
  },
  "Next": "NEXT_STATE"
}

For usecases you can refer : https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html

In our system we will be using a combination of Dynamic Parallelism and waitForTaskToken feature in step functions to wait for the reported state to return.

First we will execute the ExecuteScheduleWorkflow step function :

The process to initiate the schedule and notification system
Process to execute ‘ExecuteScheduleWorkflow’ step function

Here the Cloudwatch event rule will trigger ExecuteSchedule API with appropriate input at required time. And ExecuteSchedule will start ExecuteScheduleWorkflow step function.

An advanced overview showing the end to end working of schedule and notification system.
Working of ExecuteScheduleWorkflow stepfunction

Now further we will see the ExecuteScheduleWorkflow Step Function in detail.

The workflow above is designed in AWS Step Functions using Amazon states language (asl). It is similar to json and we can define states and their flow in them. You can see we can call external cloud services like lambda functions within the states.

Note : Here the states are basically the group of tasks , which runs when step function workflow running is in that state.

So lets go state by state :

State 1 : Enable Dynamic Parallelism and Update Thing shadow

  • Here suppose we have 3 events in a schedule. These events can be on the same device or different devices. So we will enable a Map task here where there will be an input array of 3 events. 
  • We will define a task called ‘UpdateShadow’ inside the map state. This map state will execute the same task ’UpdateShadow’ for every element(event) of the input array to the map state. That means for our example 3 parallel UpdateShadow tasks will run concurrently.
  • Let’s define this ‘UpdateShadow’ task

‘UpdateShadow’ task :

  • First it will create a subtask of type ‘waitForTaskToken’. This subtask will call ‘UpdateThingShadow’ lambda function with input and taskToken.
  • taskToken will be unique for every task.
  • Here this subtask will wait for taskToken to be returned within ‘n’ secs.
  • In our case there will be 3 tasks each in a parallel task waiting for a taskToken.

‘UpdateThingShadow’ Lambda Function :

  • This lambda function will store the taskToken to the dynamoDb database and with a primary key as ‘scheduleID’ and attributes a ‘taskToken’.
  • This lambda function will update the desired state of the Thing’s shadow in AWS IOT where the device will receive a message to change the state and after changing the state the device will report back with the reported state.
  • Here the desired state will also contain the field called scheduleID so that the device can report back with the same scheduleID and it will be easy to reference the task.
  • Here reports back means the reported state of the Thing’s shadow will be updated.

AWS IOT Rules :

  • Rules are the part of AWS IOT which gives you an ability to trigger any AWS cloud services when the messages are published in MQTT topics.
  • So we define a rule that whenever there is a change in the  shadow of a concerned device it can trigger the ‘SendTaskSuccess’ with the shadow update as input.

‘SendTaskSuccess’ Lambda Function :

  • Here this lambda function is triggered by AWS IOT rule when there is a change in thing shadow. Here the input is the update in shadow.
  • Now update can be any like metadata, reported state or desired state update. Here we will only consider a reported state update.
  • Now we have a reported state update we can take the scheduleID field from that and extract the taskToken from the dynamodb table.
  • Since we have the taskToken we can send task success to the step function task which is waiting for the taskToken. Here we will also send the reported state along with the taskToken.

This is how we can get the reported state of the concerned device.

Edge cases :

  • Here we know that all calls of MQTT are async and they might be out of order. But as we have a unique scheduleID for every event and also taskToken will only be received by the concerned task hence it is assured that the valid reported state will be received by the respected waiting task
  • If the device doesn’t reports back on time :
    • The subtask will run a timeout task which will call ‘CheckCurrentState’ Lambda and this lambda will check the current reported state of the respective device and here we can waitTaskToken for getting response back from CheckCurrentState Lambda.

Now here we have an output array (output from all parallel tasks) having reported states of the devices after the scheduled events are run parallely.

State 2 : SendNotification 

  • In these the desired states from input and reported states from map state are sent to the ‘SendNotification’ lambda function.

‘SendNotification’ Lambda Function :

  • Here for every event the desired state and reported state are compared checking whether the schedule was overall Successful, Partially Successful  or failed.
  • After that a message body is created.
  • We wanted to send messages to the owner of all devices so we will first get the SNS endpoints of the owners from the database.
  • We will push the Notifications in AWS Simple Notification services, which further will notify the respective owners by means of popup / sms. 

Here in these ways we step function workflow ends and finally after consideration of the reported state we made a fail safe and efficient Schedule and Notification system.

Alternative OpenSource Workflow design approaches:

  • We can use Uber cadence open source for designing the workflow.
  • We can also use Apache Airflow.