Simple test

Ensure your device works with this simple test.

examples/adafruit_io_simpletest.py
  1# SPDX-FileCopyrightText: Tony DiCola for Adafruit Industries
  2# SPDX-FileCopyrightText: 2019 Adafruit Industries for Adafruit Industries
  3# SPDX-License-Identifier: MIT
  4
  5# Example of using the Adafruit IO CircuitPython MQTT client
  6# to subscribe to an Adafruit IO feed and publish random data
  7# to be received by the feed.
  8import time
  9from random import randint
 10
 11
 12import board
 13import busio
 14from digitalio import DigitalInOut
 15from adafruit_esp32spi import adafruit_esp32spi
 16from adafruit_esp32spi import adafruit_esp32spi_wifimanager
 17import adafruit_esp32spi.adafruit_esp32spi_socket as socket
 18import neopixel
 19import adafruit_minimqtt.adafruit_minimqtt as MQTT
 20from adafruit_io.adafruit_io import IO_MQTT
 21
 22### WiFi ###
 23
 24# Get wifi details and more from a secrets.py file
 25try:
 26    from secrets import secrets
 27except ImportError:
 28    print("WiFi secrets are kept in secrets.py, please add them there!")
 29    raise
 30
 31# If you are using a board with pre-defined ESP32 Pins:
 32esp32_cs = DigitalInOut(board.ESP_CS)
 33esp32_ready = DigitalInOut(board.ESP_BUSY)
 34esp32_reset = DigitalInOut(board.ESP_RESET)
 35
 36# If you have an externally connected ESP32:
 37# esp32_cs = DigitalInOut(board.D9)
 38# esp32_ready = DigitalInOut(board.D10)
 39# esp32_reset = DigitalInOut(board.D5)
 40
 41spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
 42esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
 43"""Use below for Most Boards"""
 44status_light = neopixel.NeoPixel(
 45    board.NEOPIXEL, 1, brightness=0.2
 46)  # Uncomment for Most Boards
 47"""Uncomment below for ItsyBitsy M4"""
 48# status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2)
 49# Uncomment below for an externally defined RGB LED
 50# import adafruit_rgbled
 51# from adafruit_esp32spi import PWMOut
 52# RED_LED = PWMOut.PWMOut(esp, 26)
 53# GREEN_LED = PWMOut.PWMOut(esp, 27)
 54# BLUE_LED = PWMOut.PWMOut(esp, 25)
 55# status_light = adafruit_rgbled.RGBLED(RED_LED, BLUE_LED, GREEN_LED)
 56wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)
 57
 58# Define callback functions which will be called when certain events happen.
 59# pylint: disable=unused-argument
 60def connected(client):
 61    # Connected function will be called when the client is connected to Adafruit IO.
 62    # This is a good place to subscribe to feed changes.  The client parameter
 63    # passed to this function is the Adafruit IO MQTT client so you can make
 64    # calls against it easily.
 65    print("Connected to Adafruit IO!  Listening for DemoFeed changes...")
 66    # Subscribe to changes on a feed named DemoFeed.
 67    client.subscribe("DemoFeed")
 68
 69
 70def subscribe(client, userdata, topic, granted_qos):
 71    # This method is called when the client subscribes to a new feed.
 72    print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
 73
 74
 75def unsubscribe(client, userdata, topic, pid):
 76    # This method is called when the client unsubscribes from a feed.
 77    print("Unsubscribed from {0} with PID {1}".format(topic, pid))
 78
 79
 80# pylint: disable=unused-argument
 81def disconnected(client):
 82    # Disconnected function will be called when the client disconnects.
 83    print("Disconnected from Adafruit IO!")
 84
 85
 86# pylint: disable=unused-argument
 87def message(client, feed_id, payload):
 88    # Message function will be called when a subscribed feed has a new value.
 89    # The feed_id parameter identifies the feed, and the payload parameter has
 90    # the new value.
 91    print("Feed {0} received new value: {1}".format(feed_id, payload))
 92
 93
 94# Connect to WiFi
 95print("Connecting to WiFi...")
 96wifi.connect()
 97print("Connected!")
 98
 99# Initialize MQTT interface with the esp interface
100MQTT.set_socket(socket, esp)
101
102# Initialize a new MQTT Client object
103mqtt_client = MQTT.MQTT(
104    broker="io.adafruit.com",
105    port=1883,
106    username=secrets["aio_username"],
107    password=secrets["aio_key"],
108)
109
110
111# Initialize an Adafruit IO MQTT Client
112io = IO_MQTT(mqtt_client)
113
114# Connect the callback methods defined above to Adafruit IO
115io.on_connect = connected
116io.on_disconnect = disconnected
117io.on_subscribe = subscribe
118io.on_unsubscribe = unsubscribe
119io.on_message = message
120
121# Connect to Adafruit IO
122print("Connecting to Adafruit IO...")
123io.connect()
124
125# Below is an example of manually publishing a new  value to Adafruit IO.
126last = 0
127print("Publishing a new message every 10 seconds...")
128while True:
129    # Explicitly pump the message loop.
130    io.loop()
131    # Send a new message every 10 seconds.
132    if (time.monotonic() - last) >= 5:
133        value = randint(0, 100)
134        print("Publishing {0} to DemoFeed.".format(value))
135        io.publish("DemoFeed", value)
136        last = time.monotonic()