cloudarchitected

Streaming data from Event Hubs or IoT Hubs to console

Various interactive options are available to inspect messages in Azure Event Hubs or IoT Hubs, such as Azure Portal, Service Bus Explorer (Windows only) or Azure IoT Explorer.

However, for developer convenience or scripting automation, you may prefer using a CLI tool to stream the messages directly into a console or file. The Kafka Console Consumer is a great tool for doing this, using the built-in Kafka endpoint available in Event Hubs and IoT Hub.

The walkthrough below illustrates using the Azure IoT Device Telemetry Simulator to produce a sample message load, together with the Kafka Console Consumer to consume those messages, both running in Docker.

With Azure Event Hubs

Create an Azure Event Hubs namespace in Standard tier with all settings as default. In the namespace, create an Event Hub named test.

Create a file docker-compose.yml with the following content:

version: "3.5"
services:
  simulator:
    image: iottelemetrysimulator/azureiot-telemetrysimulator
    environment:
      EventHubConnectionString: "$EVENTHUBS_CONNECTIONSTRING;EntityPath=$EVENT_HUB_NAME"
      MessageCount: 0
    deploy:
      restart_policy:
        condition: on-failure

  dumper:
    image: confluentinc/cp-kafka:6.2.0
    # see https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/kafka-cli
    command:
    - /bin/bash
    - -euxc
    - |
      echo -e 'security.protocol=SASL_SSL\nsasl.mechanism=PLAIN' > client_common.properties
      echo -e 'KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="$$ConnectionString" password="$EVENTHUBS_CONNECTIONSTRING"; }; ' > jaas.conf
      export KAFKA_OPTS="-Djava.security.auth.login.config=jaas.conf"
      kafka-console-consumer --topic $EVENT_HUB_NAME --bootstrap-server $EVENTHUBS_ENDPOINT --consumer.config client_common.properties

Create a file run-solution.sh with the following content:

#!/bin/bash

set -euo pipefail

rg="$1"
namespace="$2"
topic="$3"

export EVENTHUBS_CONNECTIONSTRING=$(az eventhubs namespace authorization-rule keys list --resource-group "$rg" --namespace-name "$namespace" --name RootManageSharedAccessKey --query primaryConnectionString -o tsv)
export EVENTHUBS_ENDPOINT="$namespace.servicebus.windows.net:9093"
export EVENT_HUB_NAME="$topic"

docker-compose up

Run the script passing your resource group name, namespace name and Event Hub name as arguments.

bash run-solution.sh MY_RESOURCE_GROUP MY_NAMESPACE test
Creating eventhubs_simulator_1 ... done
Creating eventhubs_cat_1       ... done
Creating eventhubs_dumper_1    ... done
Attaching to eventhubs_cat_1, eventhubs_dumper_1, eventhubs_simulator_1
simulator_1  | Starting simulator v1.0
dumper_1     | {"deviceId": "sim000001", "time": "2021-06-25T11:12:47.0829887Z", "counter": 11}
dumper_1     | {"deviceId": "sim000001", "time": "2021-06-25T11:12:48.1410210Z", "counter": 12}
dumper_1     | {"deviceId": "sim000001", "time": "2021-06-25T11:12:49.1927594Z", "counter": 13}
dumper_1     | {"deviceId": "sim000001", "time": "2021-06-25T11:12:50.2480419Z", "counter": 14}
dumper_1     | {"deviceId": "sim000001", "time": "2021-06-25T11:12:51.2961822Z", "counter": 15}

With Azure IoT Hubs

Create an Azure IoT Hub instance with all settings as default.

Create a file docker-compose.yml with the following content:

version: "3.5"
services:
  provisioner:
    image: iottelemetrysimulator/azureiot-simulatordeviceprovisioning
    environment:
      IotHubConnectionString: $REGISTRY_CONNECTIONSTRING
  simulator:
    image: iottelemetrysimulator/azureiot-telemetrysimulator
    environment:
      IotHubConnectionString: $DEVICE_CONNECTIONSTRING
      MessageCount: 0
    deploy:
      restart_policy:
        condition: on-failure

  dumper:
    image: confluentinc/cp-kafka:6.2.0
    # see https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/kafka-cli
    command:
    - /bin/bash
    - -euxc
    - |
      echo -e 'security.protocol=SASL_SSL\nsasl.mechanism=PLAIN' > client_common.properties
      echo -e 'KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="$$ConnectionString" password="$EVENTHUBS_CONNECTIONSTRING"; }; ' > jaas.conf
      export KAFKA_OPTS="-Djava.security.auth.login.config=jaas.conf"
      kafka-console-consumer --topic $EVENT_HUB_NAME --bootstrap-server $EVENTHUBS_ENDPOINT --consumer.config client_common.properties

Create a file run-solution.sh with the following content:

#!/bin/bash

set -euo pipefail

iothub="$1"

if ! [ -e .env ]; then
  props=$(az iot hub show -n $iothub --query '[{endpoint: properties.eventHubEndpoints.events.endpoint,path:properties.eventHubEndpoints.events.path}]' -o tsv)
  read -r EVENTHUBS_ENDPOINT EVENT_HUB_NAME <<< "$props"
  DEVICE_CONNECTIONSTRING=$(az iot hub connection-string show -n $iothub --policy-name device -o tsv)
  REGISTRY_CONNECTIONSTRING=$(az iot hub connection-string show -n $iothub --policy-name registryReadWrite -o tsv)
  EVENTHUBS_CONNECTIONSTRING=$(az iot hub connection-string show --hub-name $iothub --default-eventhub -o tsv)
  EVENTHUBS_ENDPOINT=${EVENTHUBS_ENDPOINT#sb://}
  EVENTHUBS_ENDPOINT=${EVENTHUBS_ENDPOINT%/}
  echo "EVENTHUBS_ENDPOINT=$EVENTHUBS_ENDPOINT:9093" >> .env
  echo "EVENT_HUB_NAME=$EVENT_HUB_NAME" >> .env
  echo "DEVICE_CONNECTIONSTRING=$DEVICE_CONNECTIONSTRING" >> .env
  echo "REGISTRY_CONNECTIONSTRING=$REGISTRY_CONNECTIONSTRING" >> .env
  echo "EVENTHUBS_CONNECTIONSTRING=$EVENTHUBS_CONNECTIONSTRING" >> .env
fi

cat .env

docker-compose up

Run the script passing your IoT Hub name as argument.

bash run-solution.sh MY_IOT_HUB

WARNING: Found orphan containers (iot_jq_1) for this project. If you removed or renamed this service in your compose file, you can run this command with the --remove-orphans flag to clean it up.
Starting iot_simulator_1   ... done
Starting iot_provisioner_1 ... done
Starting iot_dumper_1      ... done
Attaching to iot_provisioner_1, iot_dumper_1, iot_simulator_1
provisioner_1  | Starting device provisioning
provisioner_1  | Finished device provisioning
provisioner_1  | Device count = 1
provisioner_1  | Total devices created = 0
provisioner_1  | Total devices deleted = 0
provisioner_1  | Total errors = 1
provisioner_1  | Time = 2329ms
iot_provisioner_1 exited with code 0
simulator_1    | 
simulator_1    | Starting simulator v1.0
simulator_1    | 
simulator_1    | Device count = 1
simulator_1    | Device prefix = sim
simulator_1    | Device 0-last = (sim000001-sim000001)
simulator_1    | Device index = 1
simulator_1    | Message count = 0
simulator_1    | Interval = 1000ms
simulator_1    | Template = Template: {"deviceId": "$.DeviceId", "time": "$.Time", "counter": $.Counter}
dumper_1       | {"deviceId": "sim000001", "time": "2021-06-25T10:04:16.0151091Z", "counter": 13}
dumper_1       | {"deviceId": "sim000001", "time": "2021-06-25T10:04:17.2424891Z", "counter": 14}
dumper_1       | {"deviceId": "sim000001", "time": "2021-06-25T10:04:18.4959163Z", "counter": 15}
dumper_1       | {"deviceId": "sim000001", "time": "2021-06-25T10:04:19.6447100Z", "counter": 16}

With kafkacat

The docker-compose.yml files above uses kafka-console-producer. Alternatively, the kafkacat utility can be used. As it’s written in C, it has a faster start-up time, and great options to inspect metadata and format the output.

kafkacat:
    image: confluentinc/cp-kafkacat:5.3.5
    command:
    - /bin/bash
    - -euxc
    - |
      kafkacat \
        -t $EVENT_HUB_NAME \
        -b $EVENTHUBS_ENDPOINT \
        -X security.protocol=SASL_SSL \
        -X sasl.mechanism=PLAIN \
        -X sasl.username='$$ConnectionString' \
        -X sasl.password='$EVENTHUBS_CONNECTIONSTRING' \
        -C \
        -o end

Sample application: load testing

We will increase the event rate and use a one-line shell script to compute the number of messages received per second.

From the Event Hubs example above, modify the docker-compose.yml file to write the dumped output to a data file:

  dumper:
    image: confluentinc/cp-kafka:6.2.0
    volumes:
      - data-volume:/var/tmp
    command:
    - /bin/bash
    - -euxc
    - |
      echo -e 'security.protocol=SASL_SSL\nsasl.mechanism=PLAIN' > client_common.properties
      echo -e 'KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="$$ConnectionString" password="$EVENTHUBS_CONNECTIONSTRING"; }; ' > jaas.conf
      export KAFKA_OPTS="-Djava.security.auth.login.config=jaas.conf"
      kafka-console-consumer --topic $EVENT_HUB_NAME --bootstrap-server $EVENTHUBS_ENDPOINT --consumer.config client_common.properties > /var/tmp/data.json

In the docker-compose.yml file, add a container to regularly report statistics, and the data volume definition:

  jq:
    image: stedolan/jq
    volumes:
      - data-volume:/var/tmp
    entrypoint: /bin/bash
    command:
    - -euc
    - |
      trap 'exit' SIGINT
      while true; do
        cat /var/tmp/data.json|jq .|grep time|cut -f1 -d.|sort|uniq -c|tail -10
        echo ===================================================================
        sleep 10
      done

volumes:
  data-volume:

Under the environment section of the simulator container, add settings to generate 100 events per second:

        DeviceCount: 10
        Interval: 100  # in milliseconds

Running the script again shows that the event rate is stable:

simulator_1  | 2021-06-25T11:44:11.7620481Z: 1700 total messages have been sent @ 100.21 msgs/sec
simulator_1  | 2021-06-25T11:44:12.7539624Z: 1800 total messages have been sent @ 100.82 msgs/sec
simulator_1  | 2021-06-25T11:44:13.7602160Z: 1900 total messages have been sent @ 99.38 msgs/sec
simulator_1  | 2021-06-25T11:44:14.7539800Z: 2000 total messages have been sent @ 100.63 msgs/sec
jq_1         |     100   "time": "2021-06-25T11:44:11
jq_1         |     100   "time": "2021-06-25T11:44:12
jq_1         |     100   "time": "2021-06-25T11:44:13
jq_1         |      70   "time": "2021-06-25T11:44:14
jq_1         | ===================================================================

If you do not manage to reach this rate, your latency might be too high. You could try running from an Azure Virtual Machine.

Software Engineer at Microsoft, Data & AI, open source fan