Simple test

Ensure your device works with this simple test.

examples/adafruit_io_simpletest.py
  1# SPDX-FileCopyrightText: Tony DiCola for Adafruit Industries
  2# SPDX-FileCopyrightText: 2019 Adafruit Industries for Adafruit Industries
  3# SPDX-License-Identifier: MIT
  4
  5# Example of using the Adafruit IO CircuitPython MQTT client
  6# to subscribe to an Adafruit IO feed and publish random data
  7# to be received by the feed.
  8import time
  9from os import getenv
 10from random import randint
 11
 12import adafruit_connection_manager
 13import adafruit_minimqtt.adafruit_minimqtt as MQTT
 14import board
 15import busio
 16import neopixel
 17from adafruit_esp32spi import adafruit_esp32spi, adafruit_esp32spi_wifimanager
 18from digitalio import DigitalInOut
 19
 20from adafruit_io.adafruit_io import IO_MQTT
 21
 22# Get WiFi details and Adafruit IO keys, ensure these are setup in settings.toml
 23# (visit io.adafruit.com if you need to create an account, or if you need your Adafruit IO key.)
 24ssid = getenv("CIRCUITPY_WIFI_SSID")
 25password = getenv("CIRCUITPY_WIFI_PASSWORD")
 26aio_username = getenv("ADAFRUIT_AIO_USERNAME")
 27aio_key = getenv("ADAFRUIT_AIO_KEY")
 28
 29### WiFi ###
 30
 31# If you are using a board with pre-defined ESP32 Pins:
 32esp32_cs = DigitalInOut(board.ESP_CS)
 33esp32_ready = DigitalInOut(board.ESP_BUSY)
 34esp32_reset = DigitalInOut(board.ESP_RESET)
 35
 36# If you have an externally connected ESP32:
 37# esp32_cs = DigitalInOut(board.D9)
 38# esp32_ready = DigitalInOut(board.D10)
 39# esp32_reset = DigitalInOut(board.D5)
 40
 41spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
 42esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
 43"""Use below for Most Boards"""
 44status_pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2)  # Uncomment for Most Boards
 45"""Uncomment below for ItsyBitsy M4"""
 46# status_pixel = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2)
 47# Uncomment below for an externally defined RGB LED
 48# import adafruit_rgbled
 49# from adafruit_esp32spi import PWMOut
 50# RED_LED = PWMOut.PWMOut(esp, 26)
 51# GREEN_LED = PWMOut.PWMOut(esp, 27)
 52# BLUE_LED = PWMOut.PWMOut(esp, 25)
 53# status_pixel = adafruit_rgbled.RGBLED(RED_LED, BLUE_LED, GREEN_LED)
 54wifi = adafruit_esp32spi_wifimanager.WiFiManager(esp, ssid, password, status_pixel=status_pixel)
 55
 56
 57# Define callback functions which will be called when certain events happen.
 58def connected(client):
 59    # Connected function will be called when the client is connected to Adafruit IO.
 60    # This is a good place to subscribe to feed changes.  The client parameter
 61    # passed to this function is the Adafruit IO MQTT client so you can make
 62    # calls against it easily.
 63    print("Connected to Adafruit IO!  Listening for DemoFeed changes...")
 64    # Subscribe to changes on a feed named DemoFeed.
 65    client.subscribe("DemoFeed")
 66
 67
 68def subscribe(client, userdata, topic, granted_qos):
 69    # This method is called when the client subscribes to a new feed.
 70    print(f"Subscribed to {topic} with QOS level {granted_qos}")
 71
 72
 73def unsubscribe(client, userdata, topic, pid):
 74    # This method is called when the client unsubscribes from a feed.
 75    print(f"Unsubscribed from {topic} with PID {pid}")
 76
 77
 78def disconnected(client):
 79    # Disconnected function will be called when the client disconnects.
 80    print("Disconnected from Adafruit IO!")
 81
 82
 83def publish(client, userdata, topic, pid):
 84    """This method is called when the client publishes data to a feed."""
 85    print(f"Published to {topic} with PID {pid}")
 86    if userdata is not None:
 87        print("Published User data: ", end="")
 88        print(userdata)
 89
 90
 91def message(client, feed_id, payload):
 92    # Message function will be called when a subscribed feed has a new value.
 93    # The feed_id parameter identifies the feed, and the payload parameter has
 94    # the new value.
 95    print(f"Feed {feed_id} received new value: {payload}")
 96
 97
 98# Connect to WiFi
 99print("Connecting to WiFi...")
100wifi.connect()
101print("Connected!")
102
103pool = adafruit_connection_manager.get_radio_socketpool(esp)
104ssl_context = adafruit_connection_manager.get_radio_ssl_context(esp)
105
106# Initialize a new MQTT Client object
107mqtt_client = MQTT.MQTT(
108    broker="io.adafruit.com",
109    port=1883,
110    username=aio_username,
111    password=aio_key,
112    socket_pool=pool,
113    ssl_context=ssl_context,
114)
115
116
117# Initialize an Adafruit IO MQTT Client
118io = IO_MQTT(mqtt_client)
119
120# Connect the callback methods defined above to Adafruit IO
121io.on_connect = connected
122io.on_disconnect = disconnected
123io.on_subscribe = subscribe
124io.on_unsubscribe = unsubscribe
125io.on_message = message
126io.on_publish = publish
127
128# Connect to Adafruit IO
129print("Connecting to Adafruit IO...")
130io.connect()
131
132# Below is an example of manually publishing a new  value to Adafruit IO.
133last = 0
134print("Publishing a new message every 10 seconds...")
135while True:
136    # Explicitly pump the message loop.
137    io.loop()
138    # Send a new message every 10 seconds.
139    if (time.monotonic() - last) >= 5:
140        value = randint(0, 100)
141        print(f"Publishing {value} to DemoFeed.")
142        io.publish("DemoFeed", value)
143        last = time.monotonic()