Simple test
Ensure your device works with this simple test.
examples/gc_iot_core_simpletest.py
1# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
2# SPDX-License-Identifier: MIT
3
4import time
5from os import getenv
6
7import adafruit_connection_manager
8import adafruit_minimqtt.adafruit_minimqtt as MQTT
9import board
10import busio
11import neopixel
12from adafruit_esp32spi import adafruit_esp32spi, adafruit_esp32spi_wifimanager
13from digitalio import DigitalInOut
14
15from adafruit_gc_iot_core import MQTT_API, Cloud_Core
16
17# Get WiFi details and Google Cloud keys, ensure these are setup in settings.toml
18ssid = getenv("CIRCUITPY_WIFI_SSID")
19password = getenv("CIRCUITPY_WIFI_PASSWORD")
20cloud_region = getenv("cloud_region")
21device_id = getenv("device_id")
22private_key = getenv("private_key")
23project_id = getenv("project_id")
24registry_id = getenv("registry_id")
25jwt = getenv("jwt")
26
27### WiFi ###
28
29# If you are using a board with pre-defined ESP32 Pins:
30esp32_cs = DigitalInOut(board.ESP_CS)
31esp32_ready = DigitalInOut(board.ESP_BUSY)
32esp32_reset = DigitalInOut(board.ESP_RESET)
33
34# If you have an externally connected ESP32:
35# esp32_cs = DigitalInOut(board.D9)
36# esp32_ready = DigitalInOut(board.D10)
37# esp32_reset = DigitalInOut(board.D5)
38
39spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
40esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
41"""Use below for Most Boards"""
42status_pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2) # Uncomment for Most Boards
43"""Uncomment below for ItsyBitsy M4"""
44# status_pixel = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2)
45# Uncomment below for an externally defined RGB LED
46# import adafruit_rgbled
47# from adafruit_esp32spi import PWMOut
48# RED_LED = PWMOut.PWMOut(esp, 26)
49# GREEN_LED = PWMOut.PWMOut(esp, 27)
50# BLUE_LED = PWMOut.PWMOut(esp, 25)
51# status_pixel = adafruit_rgbled.RGBLED(RED_LED, BLUE_LED, GREEN_LED)
52wifi = adafruit_esp32spi_wifimanager.WiFiManager(esp, ssid, password, status_pixel=status_pixel)
53
54### Code ###
55
56
57# Define callback methods which are called when events occur
58def connect(client, userdata, flags, rc):
59 # This function will be called when the client is connected
60 # successfully to the broker.
61 print("Connected to MQTT Broker!")
62 print(f"Flags: {flags}\n RC: {rc}")
63 # Subscribes to commands/# topic
64 google_mqtt.subscribe_to_all_commands()
65
66 # Publish to the default "events" topic
67 google_mqtt.publish("testing", "events", qos=1)
68
69
70def disconnect(client, userdata, rc):
71 # This method is called when the client disconnects
72 # from the broker.
73 print("Disconnected from MQTT Broker!")
74
75
76def subscribe(client, userdata, topic, granted_qos):
77 # This method is called when the client subscribes to a new topic.
78 print(f"Subscribed to {topic} with QOS level {granted_qos}")
79
80
81def unsubscribe(client, userdata, topic, pid):
82 # This method is called when the client unsubscribes from a topic.
83 print(f"Unsubscribed from {topic} with PID {pid}")
84
85
86def publish(client, userdata, topic, pid):
87 # This method is called when the client publishes data to a topic.
88 print(f"Published to {topic} with PID {pid}")
89
90
91def message(client, topic, msg):
92 # This method is called when the client receives data from a topic.
93 print(f"Message from {topic}: {msg}")
94
95
96# Connect to WiFi
97print("Connecting to WiFi...")
98wifi.connect()
99print("Connected!")
100
101pool = adafruit_connection_manager.get_radio_socketpool(esp)
102ssl_context = adafruit_connection_manager.get_radio_ssl_context(esp)
103
104# Initialize Google Cloud IoT Core interface
105settings = {
106 cloud_region: cloud_region,
107 device_id: device_id,
108 private_key: private_key,
109 project_id: project_id,
110 registry_id: registry_id,
111}
112google_iot = Cloud_Core(esp, settings)
113
114# Optional JSON-Web-Token (JWT) Generation
115# print("Generating JWT...")
116# jwt = google_iot.generate_jwt()
117# print("Your JWT is: ", jwt)
118
119# Set up a new MiniMQTT Client
120client = MQTT.MQTT(
121 broker=google_iot.broker,
122 username=google_iot.username,
123 password=jwt,
124 client_id=google_iot.cid,
125 socket_pool=pool,
126 ssl_context=ssl_context,
127)
128
129# Initialize Google MQTT API Client
130google_mqtt = MQTT_API(client)
131
132# Connect callback handlers to Google MQTT Client
133google_mqtt.on_connect = connect
134google_mqtt.on_disconnect = disconnect
135google_mqtt.on_subscribe = subscribe
136google_mqtt.on_unsubscribe = unsubscribe
137google_mqtt.on_publish = publish
138google_mqtt.on_message = message
139
140
141print(f"Attempting to connect to {client.broker}")
142google_mqtt.connect()
143
144# Pump the message loop forever, all events are
145# handled in their callback handlers
146# while True:
147# google_mqtt.loop()
148
149# Start a blocking message loop...
150# NOTE: NO code below this loop will execute
151# NOTE: Network reconnection is handled within this loop
152while True:
153 try:
154 google_mqtt.loop()
155 except (ValueError, RuntimeError) as e:
156 print("Failed to get data, retrying\n", e)
157 wifi.reset()
158 google_mqtt.reconnect()
159 continue
160 time.sleep(1)