Simple test

Ensure your device works with this simple test.

examples/aws_iot_simpletest.py
  1# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
  2# SPDX-License-Identifier: MIT
  3
  4import time
  5import json
  6import board
  7import busio
  8from digitalio import DigitalInOut
  9import neopixel
 10import adafruit_connection_manager
 11from adafruit_esp32spi import adafruit_esp32spi
 12from adafruit_esp32spi import adafruit_esp32spi_wifimanager
 13import adafruit_minimqtt.adafruit_minimqtt as MQTT
 14from adafruit_aws_iot import MQTT_CLIENT
 15
 16### WiFi ###
 17
 18# Get wifi details and more from a secrets.py file
 19try:
 20    from secrets import secrets
 21except ImportError:
 22    print("WiFi secrets are kept in secrets.py, please add them there!")
 23    raise
 24
 25# Get device certificate
 26try:
 27    with open("aws_cert.pem.crt", "rb") as f:
 28        DEVICE_CERT = f.read()
 29except ImportError:
 30    print("Certificate (aws_cert.pem.crt) not found on CIRCUITPY filesystem.")
 31    raise
 32
 33# Get device private key
 34try:
 35    with open("private.pem.key", "rb") as f:
 36        DEVICE_KEY = f.read()
 37except ImportError:
 38    print("Certificate (private.pem.key) not found on CIRCUITPY filesystem.")
 39    raise
 40
 41# If you are using a board with pre-defined ESP32 Pins:
 42esp32_cs = DigitalInOut(board.ESP_CS)
 43esp32_ready = DigitalInOut(board.ESP_BUSY)
 44esp32_reset = DigitalInOut(board.ESP_RESET)
 45
 46# If you have an externally connected ESP32:
 47# esp32_cs = DigitalInOut(board.D9)
 48# esp32_ready = DigitalInOut(board.D10)
 49# esp32_reset = DigitalInOut(board.D5)
 50
 51spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
 52esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
 53
 54# Verify nina-fw version >= 1.4.0
 55assert (
 56    int(bytes(esp.firmware_version).decode("utf-8")[2]) >= 4
 57), "Please update nina-fw to >=1.4.0."
 58
 59# Use below for Most Boards
 60status_light = neopixel.NeoPixel(
 61    board.NEOPIXEL, 1, brightness=0.2
 62)  # Uncomment for Most Boards
 63# Uncomment below for ItsyBitsy M4
 64# status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2)
 65# Uncomment below for an externally defined RGB LED
 66# import adafruit_rgbled
 67# from adafruit_esp32spi import PWMOut
 68# RED_LED = PWMOut.PWMOut(esp, 26)
 69# GREEN_LED = PWMOut.PWMOut(esp, 27)
 70# BLUE_LED = PWMOut.PWMOut(esp, 25)
 71# status_light = adafruit_rgbled.RGBLED(RED_LED, BLUE_LED, GREEN_LED)
 72wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)
 73
 74### Code ###
 75
 76topic = "circuitpython/aws"
 77
 78
 79# Define callback methods which are called when events occur
 80# pylint: disable=unused-argument, redefined-outer-name
 81def connect(client, userdata, flags, rc):
 82    # This function will be called when the client is connected
 83    # successfully to the broker.
 84    print("Connected to MQTT Broker!")
 85    print("Flags: {0}\n RC: {1}".format(flags, rc))
 86
 87    # Subscribe to topic circuitpython/aws
 88    print("Subscribing to topic {}".format(topic))
 89    aws_iot.subscribe(topic)
 90
 91
 92def disconnect(client, userdata, rc):
 93    # This method is called when the client disconnects
 94    # from the broker.
 95    print("Disconnected from MQTT Broker!")
 96
 97
 98def subscribe(client, userdata, topic, granted_qos):
 99    # This method is called when the client subscribes to a new topic.
100    print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
101
102    # Create a json-formatted message
103    message = {"message": "Hello from AWS IoT CircuitPython"}
104    # Publish message to topic
105    aws_iot.publish(topic, json.dumps(message))
106
107
108def unsubscribe(client, userdata, topic, pid):
109    # This method is called when the client unsubscribes from a topic.
110    print("Unsubscribed from {0} with PID {1}".format(topic, pid))
111
112
113def publish(client, userdata, topic, pid):
114    # This method is called when the client publishes data to a topic.
115    print("Published to {0} with PID {1}".format(topic, pid))
116
117
118def message(client, topic, msg):
119    # This method is called when the client receives data from a topic.
120    print("Message from {}: {}".format(topic, msg))
121
122
123# Set AWS Device Certificate
124esp.set_certificate(DEVICE_CERT)
125
126# Set AWS RSA Private Key
127esp.set_private_key(DEVICE_KEY)
128
129# Connect to WiFi
130print("Connecting to WiFi...")
131wifi.connect()
132print("Connected!")
133
134pool = adafruit_connection_manager.get_radio_socketpool(esp)
135ssl_context = adafruit_connection_manager.get_radio_ssl_context(esp)
136
137# Set up a new MiniMQTT Client
138client = MQTT.MQTT(
139    broker=secrets["broker"],
140    client_id=secrets["client_id"],
141    socket_pool=pool,
142    ssl_context=ssl_context,
143)
144
145# Initialize AWS IoT MQTT API Client
146aws_iot = MQTT_CLIENT(client)
147
148# Connect callback handlers to AWS IoT MQTT Client
149aws_iot.on_connect = connect
150aws_iot.on_disconnect = disconnect
151aws_iot.on_subscribe = subscribe
152aws_iot.on_unsubscribe = unsubscribe
153aws_iot.on_publish = publish
154aws_iot.on_message = message
155
156print("Attempting to connect to %s" % client.broker)
157aws_iot.connect()
158
159# Pump the message loop forever, all events
160# are handled in their callback handlers
161# while True:
162#   aws_iot.loop()
163
164# Start a blocking message loop...
165# NOTE: NO code below this loop will execute
166# NOTE: Network reconnection is handled within this loop
167while True:
168    try:
169        aws_iot.loop()
170    except (ValueError, RuntimeError) as e:
171        print("Failed to get data, retrying\n", e)
172        wifi.reset()
173        aws_iot.reconnect()
174        continue
175    time.sleep(1)