Simple test

Ensure your device works with this simple test.

examples/adafruit_io_simpletest.py
  1# SPDX-FileCopyrightText: Tony DiCola for Adafruit Industries
  2# SPDX-FileCopyrightText: 2019 Adafruit Industries for Adafruit Industries
  3# SPDX-License-Identifier: MIT
  4
  5# Example of using the Adafruit IO CircuitPython MQTT client
  6# to subscribe to an Adafruit IO feed and publish random data
  7# to be received by the feed.
  8import time
  9from random import randint
 10
 11
 12import board
 13import busio
 14from digitalio import DigitalInOut
 15from adafruit_esp32spi import adafruit_esp32spi
 16from adafruit_esp32spi import adafruit_esp32spi_wifimanager
 17import adafruit_esp32spi.adafruit_esp32spi_socket as socket
 18import neopixel
 19import adafruit_minimqtt.adafruit_minimqtt as MQTT
 20from adafruit_io.adafruit_io import IO_MQTT
 21
 22### WiFi ###
 23
 24# Get wifi details and more from a secrets.py file
 25try:
 26    from secrets import secrets
 27except ImportError:
 28    print("WiFi secrets are kept in secrets.py, please add them there!")
 29    raise
 30
 31# If you are using a board with pre-defined ESP32 Pins:
 32esp32_cs = DigitalInOut(board.ESP_CS)
 33esp32_ready = DigitalInOut(board.ESP_BUSY)
 34esp32_reset = DigitalInOut(board.ESP_RESET)
 35
 36# If you have an externally connected ESP32:
 37# esp32_cs = DigitalInOut(board.D9)
 38# esp32_ready = DigitalInOut(board.D10)
 39# esp32_reset = DigitalInOut(board.D5)
 40
 41spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
 42esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
 43"""Use below for Most Boards"""
 44status_light = neopixel.NeoPixel(
 45    board.NEOPIXEL, 1, brightness=0.2
 46)  # Uncomment for Most Boards
 47"""Uncomment below for ItsyBitsy M4"""
 48# status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2)
 49# Uncomment below for an externally defined RGB LED
 50# import adafruit_rgbled
 51# from adafruit_esp32spi import PWMOut
 52# RED_LED = PWMOut.PWMOut(esp, 26)
 53# GREEN_LED = PWMOut.PWMOut(esp, 27)
 54# BLUE_LED = PWMOut.PWMOut(esp, 25)
 55# status_light = adafruit_rgbled.RGBLED(RED_LED, BLUE_LED, GREEN_LED)
 56wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)
 57
 58
 59# Define callback functions which will be called when certain events happen.
 60# pylint: disable=unused-argument
 61def connected(client):
 62    # Connected function will be called when the client is connected to Adafruit IO.
 63    # This is a good place to subscribe to feed changes.  The client parameter
 64    # passed to this function is the Adafruit IO MQTT client so you can make
 65    # calls against it easily.
 66    print("Connected to Adafruit IO!  Listening for DemoFeed changes...")
 67    # Subscribe to changes on a feed named DemoFeed.
 68    client.subscribe("DemoFeed")
 69
 70
 71def subscribe(client, userdata, topic, granted_qos):
 72    # This method is called when the client subscribes to a new feed.
 73    print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
 74
 75
 76def unsubscribe(client, userdata, topic, pid):
 77    # This method is called when the client unsubscribes from a feed.
 78    print("Unsubscribed from {0} with PID {1}".format(topic, pid))
 79
 80
 81# pylint: disable=unused-argument
 82def disconnected(client):
 83    # Disconnected function will be called when the client disconnects.
 84    print("Disconnected from Adafruit IO!")
 85
 86
 87# pylint: disable=unused-argument
 88def message(client, feed_id, payload):
 89    # Message function will be called when a subscribed feed has a new value.
 90    # The feed_id parameter identifies the feed, and the payload parameter has
 91    # the new value.
 92    print("Feed {0} received new value: {1}".format(feed_id, payload))
 93
 94
 95# Connect to WiFi
 96print("Connecting to WiFi...")
 97wifi.connect()
 98print("Connected!")
 99
100# Initialize MQTT interface with the esp interface
101MQTT.set_socket(socket, esp)
102
103# Initialize a new MQTT Client object
104mqtt_client = MQTT.MQTT(
105    broker="io.adafruit.com",
106    port=1883,
107    username=secrets["aio_username"],
108    password=secrets["aio_key"],
109)
110
111
112# Initialize an Adafruit IO MQTT Client
113io = IO_MQTT(mqtt_client)
114
115# Connect the callback methods defined above to Adafruit IO
116io.on_connect = connected
117io.on_disconnect = disconnected
118io.on_subscribe = subscribe
119io.on_unsubscribe = unsubscribe
120io.on_message = message
121
122# Connect to Adafruit IO
123print("Connecting to Adafruit IO...")
124io.connect()
125
126# Below is an example of manually publishing a new  value to Adafruit IO.
127last = 0
128print("Publishing a new message every 10 seconds...")
129while True:
130    # Explicitly pump the message loop.
131    io.loop()
132    # Send a new message every 10 seconds.
133    if (time.monotonic() - last) >= 5:
134        value = randint(0, 100)
135        print("Publishing {0} to DemoFeed.".format(value))
136        io.publish("DemoFeed", value)
137        last = time.monotonic()