Simple test
Ensure your device works with this simple test.
examples/adafruit_io_simpletest.py
1# SPDX-FileCopyrightText: Tony DiCola for Adafruit Industries
2# SPDX-FileCopyrightText: 2019 Adafruit Industries for Adafruit Industries
3# SPDX-License-Identifier: MIT
4
5# Example of using the Adafruit IO CircuitPython MQTT client
6# to subscribe to an Adafruit IO feed and publish random data
7# to be received by the feed.
8import time
9from os import getenv
10from random import randint
11
12import adafruit_connection_manager
13import adafruit_minimqtt.adafruit_minimqtt as MQTT
14import board
15import busio
16import neopixel
17from adafruit_esp32spi import adafruit_esp32spi, adafruit_esp32spi_wifimanager
18from digitalio import DigitalInOut
19
20from adafruit_io.adafruit_io import IO_MQTT
21
22# Get WiFi details and Adafruit IO keys, ensure these are setup in settings.toml
23# (visit io.adafruit.com if you need to create an account, or if you need your Adafruit IO key.)
24ssid = getenv("CIRCUITPY_WIFI_SSID")
25password = getenv("CIRCUITPY_WIFI_PASSWORD")
26aio_username = getenv("ADAFRUIT_AIO_USERNAME")
27aio_key = getenv("ADAFRUIT_AIO_KEY")
28
29### WiFi ###
30
31# If you are using a board with pre-defined ESP32 Pins:
32esp32_cs = DigitalInOut(board.ESP_CS)
33esp32_ready = DigitalInOut(board.ESP_BUSY)
34esp32_reset = DigitalInOut(board.ESP_RESET)
35
36# If you have an externally connected ESP32:
37# esp32_cs = DigitalInOut(board.D9)
38# esp32_ready = DigitalInOut(board.D10)
39# esp32_reset = DigitalInOut(board.D5)
40
41spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
42esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
43"""Use below for Most Boards"""
44status_pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2) # Uncomment for Most Boards
45"""Uncomment below for ItsyBitsy M4"""
46# status_pixel = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2)
47# Uncomment below for an externally defined RGB LED
48# import adafruit_rgbled
49# from adafruit_esp32spi import PWMOut
50# RED_LED = PWMOut.PWMOut(esp, 26)
51# GREEN_LED = PWMOut.PWMOut(esp, 27)
52# BLUE_LED = PWMOut.PWMOut(esp, 25)
53# status_pixel = adafruit_rgbled.RGBLED(RED_LED, BLUE_LED, GREEN_LED)
54wifi = adafruit_esp32spi_wifimanager.WiFiManager(esp, ssid, password, status_pixel=status_pixel)
55
56
57# Define callback functions which will be called when certain events happen.
58def connected(client):
59 # Connected function will be called when the client is connected to Adafruit IO.
60 # This is a good place to subscribe to feed changes. The client parameter
61 # passed to this function is the Adafruit IO MQTT client so you can make
62 # calls against it easily.
63 print("Connected to Adafruit IO! Listening for DemoFeed changes...")
64 # Subscribe to changes on a feed named DemoFeed.
65 client.subscribe("DemoFeed")
66
67
68def subscribe(client, userdata, topic, granted_qos):
69 # This method is called when the client subscribes to a new feed.
70 print(f"Subscribed to {topic} with QOS level {granted_qos}")
71
72
73def unsubscribe(client, userdata, topic, pid):
74 # This method is called when the client unsubscribes from a feed.
75 print(f"Unsubscribed from {topic} with PID {pid}")
76
77
78def disconnected(client):
79 # Disconnected function will be called when the client disconnects.
80 print("Disconnected from Adafruit IO!")
81
82
83def publish(client, userdata, topic, pid):
84 """This method is called when the client publishes data to a feed."""
85 print(f"Published to {topic} with PID {pid}")
86 if userdata is not None:
87 print("Published User data: ", end="")
88 print(userdata)
89
90
91def message(client, feed_id, payload):
92 # Message function will be called when a subscribed feed has a new value.
93 # The feed_id parameter identifies the feed, and the payload parameter has
94 # the new value.
95 print(f"Feed {feed_id} received new value: {payload}")
96
97
98# Connect to WiFi
99print("Connecting to WiFi...")
100wifi.connect()
101print("Connected!")
102
103pool = adafruit_connection_manager.get_radio_socketpool(esp)
104ssl_context = adafruit_connection_manager.get_radio_ssl_context(esp)
105
106# Initialize a new MQTT Client object
107mqtt_client = MQTT.MQTT(
108 broker="io.adafruit.com",
109 port=1883,
110 username=aio_username,
111 password=aio_key,
112 socket_pool=pool,
113 ssl_context=ssl_context,
114)
115
116
117# Initialize an Adafruit IO MQTT Client
118io = IO_MQTT(mqtt_client)
119
120# Connect the callback methods defined above to Adafruit IO
121io.on_connect = connected
122io.on_disconnect = disconnected
123io.on_subscribe = subscribe
124io.on_unsubscribe = unsubscribe
125io.on_message = message
126io.on_publish = publish
127
128# Connect to Adafruit IO
129print("Connecting to Adafruit IO...")
130io.connect()
131
132# Below is an example of manually publishing a new value to Adafruit IO.
133last = 0
134print("Publishing a new message every 10 seconds...")
135while True:
136 # Explicitly pump the message loop.
137 io.loop()
138 # Send a new message every 10 seconds.
139 if (time.monotonic() - last) >= 5:
140 value = randint(0, 100)
141 print(f"Publishing {value} to DemoFeed.")
142 io.publish("DemoFeed", value)
143 last = time.monotonic()