FIWARE IoT Agents NGSI v2 UltraLight 2.0

Description: This tutorial extends the connection of IoT devices connecting to FIWARE to use alternate transport. The UltraLight 2.0 IoT Agent created in the previous tutorial is reconfigured to communicate with a set of dummy IoT devices which transfer secure messages over the IOTA Tangle. An additional gateway component is added to the architecture of the previous MQTT tutorial to allow for secure indelible transactions across a distributed ledger network.

The tutorial is mainly concerned with the architecture of the custom components, but uses cUrl commands where necessary, and is also available as Postman documentation

Run in Postman Run in GitPod


What is IOTA?

“Hansel took his little sister by the hand, and followed the pebbles which shone like newly-coined silver pieces, and showed them the way.”

― Jacob Grimm, Grimm's Fairy Tales

The IOTA Tangle is a directed acyclic graph which can be used as a distributed ledger. It is not a traditional blockchain, but works with the concept of a Tangle which contains the current transaction history and links from parents to child transactions which provide a single source of truth in a distributed network. Whenever information is persisted to the tangle it is replicated across all nodes so that any client, anywhere around the world can send valid transactions to a Node.

IOTA positions itself as being an ideal distributed ledger for IoT due to its feeless nature and scalable distributed structure. Obviously when architecting any smart system, the developer needs to compromise between various factors such as price, speed, reliability, security and so on. The previous MQTT tutorial was fast, but contained no security elements and was vulnerable to malicious attack. An IOTA-based IoT system will automatically include secure logging of all events and therefore could be used to for charging customers on an event-by-event basis.

A hybrid system could also be envisaged where some frequent but low risk transactions could be made using a standard MQTT transport (e.g. continuous tracking of the location an ARV), whereas infrequent but chargeable events could be made using a secure system like IOTA (e.g. credit card payment for an entire trip)

The basic IOTA architecture includes the following basic components:

  • Clients: Users of an IOTA network (wallets, apps, etc.) that send transactions to nodes to attach to the Tangle.
  • Nodes: Connected devices responsible for ensuring the integrity of the Tangle. These devices form an IOTA network.
  • Tangle: An attached data structure (public ledger, main ledger), which is replicated on all nodes in an IOTA network.

For the purpose of this tutorial, all data from the dummy devices is being stored within the IOTA Tangle. Each device reading will be placed in a transaction object and attached to the IOTA Tangle, once attached it cannot be changed and is immutable. It obviously takes time for all nodes to agree that a transaction hs occurred, and therefore all communication should be considered as asynchronous.

The IoT Agent for Ultralight currently offers three standard transport mechanisms - HTTP, MQTT and AMPQ. Whereas it would be possible to create a new binding directly for IOTA, in this case, it makes more sense to re-use the existing asynchronous MQTT binding and extend using a gateway solution where a separate microservice deals with the IOTA messages. IoT Agents based on gateway solutions already exist for OPC-UA and LoRaWAN. In the case of the IoT Agent for OPC-UA for example, in its own Tutorial, device readings are passed to an OPC-UA server and the IoT Agent in turn subscribes to the OPC-UA server and transforms messages into NGSI format. With the Gateway solution described in this tutorial effectively MQTT is now just being used as a message bus, so we can provision our IoT devices as MQTT devices and intercept the relevant MQTT topics to transform the data into IOTA Tangle transactions to talk to IOTA Tangle enabled devices. The payload of each message continues to use the existing UltraLight 2.0 syntax, and therefore we can continue to use the same FIWARE generic enabler to connect the devices. It is merely the underlying transport which has been customized in this scenario.

Mosquitto MQTT Broker

Mosquitto is a readily available, open source MQTT broker which will be used during this tutorial. It is available licensed under EPL/EDL. More information can be found at https://mosquitto.org/

Device Monitor

For the purpose of this tutorial, a series of dummy IoT devices have been created, which will be attached to the context broker. Details of the architecture and protocol used can be found in the IoT Sensors tutorial The state of each device can be seen on the UltraLight device monitor web page found at: http://localhost:3000/device/monitor

FIWARE Monitor

Note: In addition to user interactions, All dummy devices will periodically register a heartbeat message

Architecture

This application builds on the components created in previous tutorials. It will make use of two FIWARE components - the Orion Context Broker and the IoT Agent for UltraLight 2.0. Usage of the Orion Context Broker is sufficient for an application to qualify as “Powered by FIWARE”. Both the Orion Context Broker and the IoT Agent rely on open source MongoDB technology to keep persistence of the information they hold. We will also be using the dummy IoT devices created in the previous tutorial Additionally we will add an instance of the Mosquitto MQTT broker which is open source and available under the EPL/EDL and create a custom MQTT-IOTA Gateway to enable us to persist commands to the IOTA Tangle and to subscribe to topics to receive measurements and command acknowledgements when they occur.

Therefore, the overall architecture will consist of the following elements:

  • The FIWARE Orion Context Broker which will receive requests using NGSI-v2
  • The FIWARE IoT Agent for UltraLight 2.0 which will:
    • receive southbound requests using NGSI-v2 and convert them to UltraLight 2.0 MQTT topics for the MQTT Broker
    • listen to the MQTT Broker on registered topics to send measurements northbound
  • The Mosquitto MQTT Broker which acts as a central communication point, passing MQTT topics between the IoT Agent and IoT devices as necessary.
  • The underlying MongoDB database :
    • Used by the Orion Context Broker to hold context data information such as data entities, subscriptions and registrations
    • Used by the IoT Agent to hold device information such as device URLs and Keys
  • A webserver acting as set of dummy IoT devices using the UltraLight 2.0 protocol running over the IOTA Tangle.
  • An MQTT-IOTA gateway which persists MQTT topic messages to the tangle and vice-vera.

Since all interactions between the elements are initiated by HTTP or MQTT requests over TCP, the entities can be containerized and run from exposed ports.

The necessary configuration information for wiring up the Mosquitto MQTT Broker, the IoT devices and the IoT Agent can be seen in the services section of the associated docker-compose.yml file:

Mosquitto Configuration

mosquitto:
    image: eclipse-mosquitto
    hostname: mosquitto
    container_name: mosquitto
    networks:
        - default
    expose:
        - '1883'
        - '9001'
    ports:
        - '1883:1883'
        - '9001:9001'
    volumes:
        - ./mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf

The mosquitto container is listening on two ports:

  • Port 1883 is exposed so we can post MQTT topics
  • Port 9001 is the standard port for HTTP/Websocket communications

The attached volume is a configuration file used to increase the debug level of the MQTT Message Broker.

Dummy IoT Devices Configuration

tutorial:
    image: quay.io/fiware/tutorials.context-provider
    hostname: iot-sensors
    container_name: fiware-tutorial
    networks:
        - default
    expose:
        - '3000'
        - '3001'
    ports:
        - '3000:3000'
        - '3001:3001'
    environment:
        - 'DEBUG=tutorial:*'
        - 'WEB_APP_PORT=3000'
        - 'DUMMY_DEVICES_PORT=3001'
        - 'DUMMY_DEVICES_API_KEYS=1068318794,3020035,3314136,3089326'
        - 'DUMMY_DEVICES_TRANSPORT=IOTA'
        - 'IOTA_NODE=https://chrysalis-nodes.iota.cafe'
        - 'IOTA_MESSAGE_INDEX=fiware'

The tutorial container is listening on two ports:

  • Port 3000 is exposed, so we can see the web page displaying the Dummy IoT devices.
  • Port 3001 is exposed purely for tutorial access - so that cUrl or Postman can make UltraLight commands without being part of the same network.

The tutorial container is driven by environment variables as shown:

Key Value Description
DEBUG tutorial:* Debug flag used for logging
WEB_APP_PORT 3000 Port used by web-app which displays the dummy device data
DUMMY_DEVICES_PORT 3001 Port used by the dummy IoT devices to receive commands
DUMMY_DEVICES_API_KEYS 4jggokgpepnvsb2uv4s40d59ov List of security key used for UltraLight interactions - used to ensure the integrity of interactions between the devices and the IoT Agent
DUMMY_DEVICES_TRANSPORT IOTA The transport protocol used by the dummy IoT devices
IOTA_NODE https://chrysalis-nodes.iota.cafe Starting IOTA node for the Gateway to connect to
IOTA_MESSAGE_INDEX fiware Message index used to persist the data devices

The other tutorial container configuration values described in the YAML file are not used in this tutorial.

IoT Agent for UltraLight 2.0 Configuration

The IoT Agent for UltraLight 2.0 can be instantiated within a Docker container. An official Docker image is available from Docker Hub tagged fiware/iotagent-ul. The necessary configuration can be seen below:

iot-agent:
    image: quay.io/fiware/iotagent-ul:latest
    hostname: iot-agent
    container_name: fiware-iot-agent
    depends_on:
        - mongo-db
    networks:
        - default
    expose:
        - '4041'
    ports:
        - '4041:4041'
    environment:
        - IOTA_CB_HOST=orion
        - IOTA_CB_PORT=1026
        - IOTA_NORTH_PORT=4041
        - IOTA_REGISTRY_TYPE=mongodb
        - IOTA_LOG_LEVEL=DEBUG
        - IOTA_TIMESTAMP=true
        - IOTA_CB_NGSI_VERSION=v2
        - IOTA_AUTOCAST=true
        - IOTA_MONGO_HOST=mongo-db
        - IOTA_MONGO_PORT=27017
        - IOTA_MONGO_DB=iotagentul
        - IOTA_PROVIDER_URL=http://iot-agent:4041
        - IOTA_MQTT_HOST=mosquitto
        - IOTA_MQTT_PORT=1883

The iot-agent container relies on the presence of the Orion Context Broker and uses a MongoDB database to hold device information such as device URLs and Keys. The container is listening on a single port:

  • Port 4041 is exposed purely for tutorial access - so that cUrl or Postman can make provisioning commands without being part of the same network.

The iot-agent container is driven by environment variables as shown:

Key Value Description
IOTA_CB_HOST orion Hostname of the context broker to update context
IOTA_CB_PORT 1026 Port that context broker listens on to update context
IOTA_NORTH_PORT 4041 Port used for Configuring the IoT Agent and receiving context updates from the context broker
IOTA_REGISTRY_TYPE mongodb Whether to hold IoT device info in memory or in a database
IOTA_LOG_LEVEL DEBUG The log level of the IoT Agent
IOTA_TIMESTAMP true Whether to supply timestamp information with each measurement received from attached devices
IOTA_CB_NGSI_VERSION v2 Whether to supply use NGSI v2 when sending updates for active attributes
IOTA_AUTOCAST true Ensure Ultralight number values are read as numbers not strings
IOTA_MONGO_HOST context-db The hostname of mongoDB - used for holding device information
IOTA_MONGO_PORT 27017 The port mongoDB is listening on
IOTA_MONGO_DB iotagentul The name of the database used in mongoDB
IOTA_PROVIDER_URL http://iot-agent:4041 URL passed to the Context Broker when commands are registered, used as a forwarding URL location when the Context Broker issues a command to a device
IOTA_MQTT_HOST mosquitto The hostname of the MQTT Broker
IOTA_MQTT_PORT 1883 The port the MQTT Broker is listening on to receive topics

As you can see, use of the MQTT transport is driven by only two environment variables IOTA_MQTT_HOST and IOTA_MQTT_PORT

MQTT-IOTA Gateway Configuration

iota-gateway:
    image: iota-gateway
    hostname: iota-gateway
    container_name: iota-gateway
    build:
        context: iota-gateway
        dockerfile: Dockerfile
    networks:
        - default
    environment:
        - 'DEBUG=gateway:*'
        - 'MQTT_BROKER_URL=mqtt://mosquitto'
        - 'IOTA_NODE=https://chrysalis-nodes.iota.cafe'
        - 'IOTA_MESSAGE_INDEX=fiware'

The iota-gateway container is a middleware connecting to the MQTT broker and reading and persisting transactions onto IOTA Tangle. This middleware therefore needs to connect to both the MQTT broker and the IOTA Tangle and repeats some parameters described above.

Video: NGSI-v2 IoT Agent Gateway

Click on the image above to watch a demo of this tutorial describing how to use an IoT Agent Gateway with IOTA.

Start Up

Before you start you should ensure that you have obtained or built the necessary Docker images locally. Please clone the repository and create the necessary images by running the commands as shown:

git clone https://github.com/FIWARE/tutorials.IoT-over-IOTA.git
cd tutorials.IoT-over-IOTA
git checkout NGSI-v2

./services create

Thereafter, all services can be initialized from the command-line by running the services Bash script provided within the repository:

./services start

:information_source: Note: If you want to clean up and start over again you can do so with the following command:

./services stop

Provisioning Devices

Provisioning devices is not the focus of this tutorial, and all the necessary device provisioning occurs automatically when the tutorial is started. However, for completeness the provisioning requests are repeated here and described below. It is not necessary to re-run these commands.

A series of service groups are created to associate classes of devices to an API Key, in the example below services have been created for type=Bell and type=Motion. It should be noted that the resource attribute has been left blank and the transport set to MQTT - this is the same as the in the previous MQTT tutorial, since the IoT Agent is merely sending messsages to the MQTT broker and has no idea that a custom gateway component is also involved.

curl -X POST  \
  "http://localhost:4041/iot/services" \
  -H 'Content-Type: application/json' \
  -H 'fiware-service: openiot' \
  -H 'fiware-servicepath: /' \
  -d '{
 "services": [
   {
     "apikey":      "1068318794",
     "cbroker":     "'"http://orion:1026"'",
     "entity_type": "Motion",
     "resource":    "",
     "protocol":    "PDI-IoTA-UltraLight",
     "transport":   "MQTT",
     "timezone":    "Europe/Berlin",
     "attributes": [
        {"object_id": "c", "name":"count", "type":"Integer"},
        {"object_id": "t", "name": "TimeInstant", "type":"DateTime"}
      ],
      "static_attributes": [
          {"name": "category", "type":"Text", "value": ["sensor"]},
          {"name": "controlledProperty", "type": "Text", "value": "motion"},
          {"name": "function", "type": "Text", "value":["sensing"]},
          {"name": "supportedProtocol", "type": "Text", "value": ["ul20"]},
          {"name": "supportedUnits", "type": "Text", "value": "C62"}
      ]
   },
   {
     "apikey":      "3020035",
     "cbroker":     "'"http://orion:1026"'",
     "entity_type": "Bell",
     "resource":    "",
     "protocol":    "PDI-IoTA-UltraLight",
     "transport":   "MQTT",
     "timezone":    "Europe/Berlin",
     "commands": [
        {
          "name": "ring",
          "type": "command"
        }
      ],
      "static_attributes": [
          {"name": "category", "type":"Text", "value": ["actuator"]},
          {"name": "controlledProperty", "type": "Text", "value": "noiseLevel"},
          {"name": "function", "type": "Text", "value":["onOff"]},
          {"name": "supportedProtocol", "type": "Text", "value": ["ul20"]}
      ]
   }
 ]
}'

Commands and measures defined when individual devices are provisioned. Once again the transport is set to MQTT.

curl -X POST \
  "http://localhost:4041/iot/devices" \
  -H 'Content-Type: application/json' \
  -H 'fiware-service: openiot' \
  -H 'fiware-servicepath: /' \
  -d '{
 "devices": [
   {
     "device_id":   "motion001",
     "entity_name": "urn:ngsi-ld:Motion:001",
     "entity_type": "Motion",
     "transport":   "MQTT",
     "static_attributes": [
         {"name": "refStore", "type": "Relationship","value": "urn:ngsi-ld:Store:001"}
     ]
   },
   {
      "device_id": "bell001",
      "entity_name": "urn:ngsi-ld:Bell:001",
      "entity_type": "Bell",
      "transport":   "MQTT",
      "static_attributes": [
          {"name": "refStore", "type": "Relationship","value": "urn:ngsi-ld:Store:001"}
      ]
    }
  ]
}
'

Device Monitor

The device monitor can be found at: http://localhost:3000/device/monitor - open the web page to view the state of the devices and view the persisted IOTA Tangle traffic.

Display the IOTA-Gateway logs (1st Terminal)

Open a new terminal, and follow the iota-gateway Docker container as follows:

docker logs -f iota-gateway

The terminal will then be ready to display received messages.

1st terminal - Gateway Result:

If the MQTT-IOTA Gateway is functioning correctly, the following messages should be displayed

2021-12-07T15:28:42.855Z gateway:app connected to IOTA Tangle: https://chrysalis-nodes.iota.cafe
2021-12-07T15:28:42.862Z gateway:app Subscribing to 'messages/indexation/fiware/attrs'
2021-12-07T15:28:42.872Z gateway:app Subscribing to 'messages/indexation/fiware/cmdexe'

The gateway needs to subscribe to the IOTA Tangle to receive measures and command acknowledgements.

Display the Dummy Device logs (2nd Terminal)

A sensor sending northbound measurements will persist transactions to the IOTA Tangle to which will be passed on to any subscriber than wants them. The sensor does not need to make a connection to the subscriber directly. Similarly, any connected actuators will need to subscribe to an IOTA Tangle message topic to receive aby commands that are relevant to them.

Open a new terminal, and run a fiware-tutorial Docker container to send a message as follows:

docker logs -f fiware-tutorial

The terminal will then be ready to display received messages.

2nd terminal - Device Result:

If the Devices are functioning correctly, the message should be received in the other terminal:

2021-12-07T15:29:22.163Z tutorial:server Listening on port 3000
2021-12-07T15:29:22.166Z tutorial:server Listening on port 3001
2021-12-07T15:29:22.522Z tutorial:application MongoDB is connected.
2021-12-07T15:29:22.612Z tutorial:iot-device connected to IOTA Tangle: https://chrysalis-nodes.iota.cafe
2021-12-07T15:29:22.613Z tutorial:iot-device Subscribing to 'messages/indexation/fiware/cmd'

Using the IOTA Tangle as a Transport

Sending Commands

Since all the devices have been pre-provisioned, a bell can be rung using a standard NGSI-v2 PATCH request:

Note: The major use case for an IOTA Tangle transport is very different to the transports used in previous tutorials (HTTP, MQTT). Specifically IOTA Tangle is designed for IoT Sensors to indelibly record transactions across a global network with a resolution time of around 60 seconds for each transaction. Therefore compared to previous tutorials repsonses will be very slow.

Every transaction is added to a directed acyclic graph, so it can be traced and therefore the IOTA Tangle is a good candidate for a single source of truth and trust in high value context data. However, the IOTA Tangle would usually be a poor choice for inherently low-value essentially meaningless context data changes within a tutorial application.

1 Request:

curl -L -X PATCH 'http://localhost:1026/v2/entities/urn:ngsi-ld:Bell:001/attrs' \
-H 'fiware-service: openiot' \
-H 'fiware-servicepath: /' \
-H 'Content-Type: application/json' \
--data-raw '{
  "ring": {
      "type" : "command",
      "value" : ""
  }
}'

The NGSI request is transformed into an MQTT message (with an Ultralight payload) which is received by the MQTT-IOTA Gateway - this message is then persisted to the IOTA Tangle as shown:

1st terminal - Gateway Result:

2021-12-07T15:50:54.848Z gateway:southbound Command received from MQTT bell001@ring|
2021-12-07T15:51:12.580Z gateway:southbound Command pushed to Tangle: i=bell001&k=1068318794&d=bell001@ring|  to fiware/cmd
2021-12-07T15:51:12.581Z gateway:southbound messageId: 40431e6e39ade9babe02ef342ee9267f69982fe42db8f5d3f32d57bb686120d5

The dummy device is also subscribing to IOTA Tangle messages, a message is received and the device is activated (in this case the bell will ring). At this point an acknowledgement is placed onto the fiware/cmdexe topic:

2nd terminal - Device Result:

2021-12-07T15:51:12.583Z tutorial:iot-device IOTA Tangle message received:  40431e6e39ade9babe02ef342ee9267f69982fe42db8f5d3f32d57bb686120d5
2021-12-07T15:51:17.806Z tutorial:ultralight command response sent to fiware/cmdexe
2021-12-07T15:51:17.806Z tutorial:ultralight 960e8ac4a9e22e360f7e92c3a7b9ac3b71c59950fd2fba7f4be551f930342f94
2021-12-07T15:51:17.812Z tutorial:devices actuateDevice: bell001 ring

If you are viewing the device monitor page, you can also see the state of the bell change.

The Gateway receives the acknowledgement from the IOTA Tangle fiware/cmdexe topic and returns the result of the request to the IoT Agent.

1st terminal - Gateway Result:

2021-12-07T15:51:18.022Z gateway:northbound Command response received from Tangle: i=bell001&k=1068318794&d=bell001@ring| ring OK
2021-12-07T15:51:18.027Z gateway:northbound Sent to MQTT topic /1068318794/bell001/cmdexe
2021-12-07T15:51:34.741Z gateway:northbound Command response received from Tangle: i=bell001&k=1068318794&d=bell001@ring| ring OK

The result of the command to ring the bell can be read by querying the entity within the Orion Context Broker.

2 Request:

curl -L -X GET 'http://localhost:1026/v2/entities/urn:ngsi-ld:Bell:001?options=keyValues' \
-H 'fiware-service: openiot' \
-H 'fiware-servicepath: /'

Response:

{
    "id": "urn:ngsi-ld:Bell:001",
    "type": "Bell",
    "TimeInstant": "2021-12-07T15:51:36.219Z",
    "category": ["actuator"],
    "controlledProperty": "noiseLevel",
    "function": ["onOff"],
    "refStore": "urn:ngsi-ld:Store:001",
    "ring_info": "OK",
    "ring_status": "OK",
    "supportedProtocol": ["ul20"],
    "ring": ""
}

The TimeInstant shows last the time any command associated with the entity has been invoked. The result of ring command can be seen in the value of the ring_info attribute.

Note: IOTA Transactions are not instantaneous, if the bell is queried before the transaction is complete, the response will leave the ring_status as PENDING

{ "id": "urn:ngsi-ld:Bell:001", "type": "Bell", "TimeInstant": "2021-12-07T15:51:36.219Z", "category": [ "actuator" ], "controlledProperty": "noiseLevel", "function": [ "onOff" ], "refStore": "urn:ngsi-ld:Store:001", "ring_info": "UNKNOWN", "ring_status": "PENDING", "supportedProtocol": [ "ul20" ], "ring": "" }

Furthermore, all transactions can be found on the IOTA Tangle under https://explorer.iota.org/mainnet/message/<message_id>, for example https://explorer.iota.org/mainnet/message/40431e6e39ade9babe02ef342ee9267f69982fe42db8f5d3f32d57bb686120d5 permanently holds the following data:

Which indicates a request was sent to ring the bell.

Sending Device Measures

A measure from a device can be simulated by selecting Detect Motion from the dropdown and clicking on send.

The device persists the measure to the fiware/attrs topic on the IOTA Tangle Mainnet.

2nd terminal - Device Result:

2021-12-07T16:34:25.767Z tutorial:devices fireMotionSensor
2021-12-07T16:34:26.185Z tutorial:northbound sendIOTAMeasure: motion001
2021-12-07T16:34:26.479Z tutorial:ultralight measure sent to fiware/attrs
2021-12-07T16:34:26.479Z tutorial:ultralight da4df31054df529a3ade74befb84edabf7697ae8a3a9ee3481be08ee0aabb3e7

Once the transactions is settled, it is passed onto the subscribing Gateway component.

1st terminal - Gateway Result:

2021-12-07T16:35:25.679Z gateway:northbound Measure received from Tangle: i=motion001&k=1068318794&d=c|0|t|2021-12-07T16:34:44.891Z
2021-12-07T16:35:25.680Z gateway:northbound Sent to MQTT topic /1068318794/motion001/attrs

There may be a noticeable lag between reading the measure and it being received at the context broker. The payload of the measure therefore contains a timestamp t|2021-12-07T16:34:44.891Z which is mapped to TimeInstant in the IoT Agent to ensure that the correct metadata is associated with the measure in the context broker.

The state of the sensor can be read by querying the entity within the Orion Context Broker.

3 Request:

curl -L -X GET 'http://localhost:1026/v2/entities/urn:ngsi-ld:Motion:001?options=keyValues' \
-H 'fiware-service: openiot' \
-H 'fiware-servicepath: /'

Response:

{
    "id": "urn:ngsi-ld:Motion:001",
    "type": "Motion",
    "TimeInstant": "2021-12-07T16:34:44.891Z",
    "category": ["sensor"],
    "controlledProperty": "motion",
    "count": "0",
    "function": ["sensing"],
    "refStore": "urn:ngsi-ld:Store:001",
    "supportedProtocol": ["ul20"],
    "supportedUnits": "C62"
}

MQTT-IOTA Gateway - Sample Code

The MQTT-IOTA Gateway is a simple application written in Node.js. Its only function is passing data between the two transports. MQTT Client libraries already exist so the application can be set up to listen to the normal MQTT topic for IoT Agent actuations.

const mqtt = require('mqtt');
const MQTT_CLIENT = mqtt.connect('mqtt://mosquitto');
MQTT_CLIENT.on('connect', () => {
    MQTT_CLIENT.subscribe('/+/+/cmd');
});
MQTT_CLIENT.on('message', Southbound.command);

Similarly, there are equivalent client libraries available in multiple languages for persisting and listening to changes on the IOTA Tangle. The MQTT-IOTA Gateway needs to listen on two topics - one for device measures and a second one for acknowledgements of commands:

const iotaClient = require('@iota/client');
const IOTA_CLIENT = new iotaClient.ClientBuilder().node('https://chrysalis-nodes.iota.cafe').build();

IOTA_CLIENT.getInfo()
    .then(() => {
        IOTA_CLIENT.subscriber()
            .topic(IOTA_MESSAGE_INDEX + 'messages/indexation/fiware/attrs')
            .subscribe((err, data) => {
                const messageId = IOTA_CLIENT.getMessageId(data.payload);
                IOTA_CLIENT.getMessage()
                    .data(messageId)
                    .then((messageData) => {
                        Northbound.measure(messageData);
                    });
            });
        IOTA_CLIENT.subscriber()
            .topic(IOTA_MESSAGE_INDEX + 'messages/indexation/fiware/cmdexe')
            .subscribe((err, data) => {
                const messageId = IOTA_CLIENT.getMessageId(data.payload);
                IOTA_CLIENT.getMessage()
                    .data(messageId)
                    .then((messageData) => {
                        Northbound.commandResponse(messageData);
                    });
            });
    })
    .catch((err) => {
        debug(err);
    });

MQTT-IOTA Gateway Southbound - Sample Code

For the southbound traffic, the API Key and device ID are extracted from the MQTT topic and moved into the IOTA payload. The syntax of the IOTA payload (with i, k and d attributes) is based on the Ultralight HTTP syntax. The message is then persisted to the Tangle using an appropriate index:

function command(topic = 'cmd', message) {
    const parts = topic.toString().split('/');
    const apiKey = parts[1];
    const deviceId = parts[2];
    const action = parts[3];
    forwardAsIOTATangle(apiKey, deviceId, message.toString(), action);
}

function forwardAsIOTATangle(apiKey, deviceId, state, topic) {
    const payload = 'i=' + deviceId + '&k=' + apiKey + '&d=' + state;
    IOTA_CLIENT.message()
        .index('fiware/' + topic)
        .data(payload)
        .submit()
        .then((message) => {
            debug('messageId: ' + message.messageId);
        });
}

MQTT-IOTA Gateway Northbound - Sample Code

Northbound traffic is similar - the payload is received from the IOTA Tangle, unmarshalled to reveal the API Key and device id, and the posted to an appropriate MQTT Topic.

function unmarshall(payload) {
    const parts = payload.split('&');
    const obj = {};
    parts.forEach((elem) => {
        const keyValues = elem.split('=');
        obj[keyValues[0]] = keyValues[1];
    });
    return obj;
}

function measure(messageData) {
    const payload = Buffer.from(messageData.message.payload.data, 'hex').toString('utf8');
    const data = unmarshall(payload);
    forwardAsMQTT(data.k, data.i, data.d, 'attrs');
}

function forwardAsMQTT(apiKey, deviceId, state, topic) {
    const mqttTopic = '/' + apiKey + '/' + deviceId + '/' + topic;
    MQTT_CLIENT.publish(mqttTopic, state);
}

The full code of the MQTT-IOTA Gateway includes additional error handling and asynchronous data handling to defer the execution of a function until the next Event Loop iteration.

IOTA Tangle Device - Sample Code

The code for a device to connect to the IOTA Tangle is repeated on the device. Actuators must listen to an agreed topic in order to be informed of commands. process.nextTick() can be used to ensure commands are not missed and can be processed when time permits.

const iotaClient = require('@iota/client');
const IOTA_CLIENT = new iotaClient.ClientBuilder().node('https://chrysalis-nodes.iota.cafe').build();

IOTA_CLIENT.getInfo().then(() => {
    IOTA_CLIENT.subscriber()
        .topic('messages/indexation/cmd')
        .subscribe((err, data) => {
            return process.nextTick(() => {
                readFromTangle(data);
            });
        });
});

function readFromTangle(data) {
    const messageId = IOTA_CLIENT.getMessageId(payload);
    IOTA_CLIENT.getMessage()
        .data(messageId)
        .then((messageData) => {
            const payload = Buffer.from(messageData.message.payload.data, 'hex').toString('utf8');
            Southbound.processIOTAMessage(messageId, payload);
        });
}

IOTA Tangle Device Command Acknowledgement

For real devices, the callback of a successful actuation should cause an acknowledgement to be sent. Acknowledgements are queued and sent in order. If an error occurs the acknowledgement must be resent or the command will remain in a PENDING state.

function processIOTAMessage(apiKey, deviceId, message) {
    const keyValuePairs = message.split('|') || [''];
    const command = getUltralightCommand(keyValuePairs[0]);
    process.nextTick(() => {
        IoTDevices.actuateDevice(deviceId, command).then((response) => {
            queue.push({ responsePayload, deviceId, command });
        });
    });
}
const async = require('async');
const queue = async.queue((data, callback) => {
    IOTA_CLIENT.message()
        .index('fiware/cmdexe')
        .data(data.responsePayload)
        .submit()
        .then((response) => {
            callback();
        })
        .catch((err) => {
            setTimeout(() => {
                queue.push(data);
            }, 1000);
            callback(err);
        }, 8);
});

IOTA Tangle Device measure - Sample Code

Measures are dealt with in a similar manner. The payload is created in Ultralight syntax (including a timestamp) and pushed to a queue. The queue sends the measure to the IOTA Tangle and reschedules any failures.

function sendAsIOTA(deviceId, state) {
    const payload = 'i=' + deviceId + '&k=' + getAPIKey(deviceId) + '&d=' + state + '|t|' + new Date().toISOString();
    queue.push(payload);
}
const async = require('async');

const queue = async.queue((payload, callback) => {
    IOTA_CLIENT.message()
        .index('fiware/attrs')
        .data(payload)
        .submit()
        .then((message) => {
            callback();
        })
        .catch((err) => {
            setTimeout(() => {
                // resending measure
                queue.push(payload);
            }, 1000);
            callback(err);
        }, 8);
});