Simple test

Ensure your device works with this simple test.

examples/ble_file_transfer_simpletest.py
  1# SPDX-FileCopyrightText: 2020 ladyada for Adafruit Industries
  2# SPDX-License-Identifier: MIT
  3
  4"""
  5Used with ble_uart_echo_test.py. Transmits "echo" to the UARTService and receives it back.
  6"""
  7
  8import binascii
  9import random
 10import time
 11
 12from adafruit_ble import BLERadio
 13from adafruit_ble.advertising.standard import (
 14    ProvideServicesAdvertisement,
 15    Advertisement,
 16)
 17import adafruit_ble_file_transfer
 18
 19
 20def _write(client, filename, contents, *, offset=0):
 21    # pylint: disable=redefined-outer-name
 22    start = time.monotonic()
 23    try:
 24        client.write(filename, contents, offset=offset)
 25        duration = time.monotonic() - start
 26        client = wait_for_reconnect()
 27    except RuntimeError:
 28        print("write failed. is usb connected?")
 29        return client
 30    print("wrote", filename, "at rate", len(contents) / duration, "B/s")
 31    return client
 32
 33
 34def _read(client, filename, *, offset=0):
 35    # pylint: disable=redefined-outer-name
 36    start = time.monotonic()
 37    try:
 38        contents = client.read(filename, offset=offset)
 39        duration = time.monotonic() - start
 40    except ValueError:
 41        print("missing file:", filename)
 42        return b""
 43    print("read", filename, "at rate", len(contents) / duration, "B/s")
 44    return contents
 45
 46
 47ble = BLERadio()
 48
 49peer_address = None
 50
 51
 52def wait_for_reconnect():
 53    print("reconnecting", end="")
 54    while ble.connected:
 55        pass
 56    print(".", end="")
 57    new_connection = ble.connect(peer_address)
 58    print(".", end="")
 59    if not new_connection.paired:
 60        print(".", end="")
 61        new_connection.pair()
 62    new_service = new_connection[adafruit_ble_file_transfer.FileTransferService]
 63    new_client = adafruit_ble_file_transfer.FileTransferClient(new_service)
 64    print(".", end="")
 65    time.sleep(2)
 66    print("done")
 67    return new_client
 68
 69
 70# ble._adapter.erase_bonding()
 71# print("erased")
 72while True:
 73    try:
 74        while ble.connected:
 75            for connection in ble.connections:
 76                # pylint: disable=redefined-outer-name
 77                if adafruit_ble_file_transfer.FileTransferService not in connection:
 78                    continue
 79                if not connection.paired:
 80                    print("pairing")
 81                    connection.pair()
 82                print("paired")
 83                print()
 84                service = connection[adafruit_ble_file_transfer.FileTransferService]
 85                client = adafruit_ble_file_transfer.FileTransferClient(service)
 86
 87                print("Testing write")
 88                client = _write(client, "/hello.txt", "Hello world".encode("utf-8"))
 89                time.sleep(1)
 90                c = _read(client, "/hello.txt")
 91                print(len(c), c)
 92                print()
 93
 94                print("Testing mkdir")
 95                try:
 96                    client.mkdir("/world/")
 97                except ValueError:
 98                    print("path exists or isn't valid")
 99                print(client.listdir("/"))
100                print(client.listdir("/world/"))
101                print()
102
103                print("Test writing within dir")
104                client = _write(client, "/world/hi.txt", "Hi world".encode("utf-8"))
105
106                hello_world = "Hello world".encode("utf-8")
107                client = _write(client, "/world/hello.txt", hello_world)
108                c = _read(client, "/world/hello.txt")
109                print(c)
110                print()
111
112                # Test offsets
113                print("Testing offsets")
114                hello = len("Hello ".encode("utf-8"))
115                c = _read(client, "/world/hello.txt", offset=hello)
116                print(c)
117
118                client = _write(
119                    client, "/world/hello.txt", "offsets!".encode("utf-8"), offset=hello
120                )
121                c = _read(client, "/world/hello.txt", offset=0)
122                print(c)
123                print()
124
125                # Test deleting
126                print("Testing delete in /world/")
127                print(client.listdir("/world/"))
128                try:
129                    client.delete("/world/hello.txt")
130                except ValueError:
131                    print("delete failed")
132
133                try:
134                    client.delete("/world/")
135                    print("deleted /world/")
136                except ValueError:
137                    print("delete failed")
138                print(client.listdir("/world/"))
139                try:
140                    client.delete("/world/hi.txt")
141                except ValueError:
142                    pass
143                try:
144                    client.delete("/world/")
145                except ValueError:
146                    pass
147                print()
148
149                # Test move
150                print("Testing move")
151                print(client.listdir("/"))
152                try:
153                    client.move("/hello.txt", "/world/hi.txt")
154                except ValueError:
155                    pass
156                try:
157                    client.move("/hello.txt", "/hi.txt")
158                except ValueError:
159                    print("move failed")
160                print(client.listdir("/"))
161                client.delete("/hi.txt")
162                print()
163
164                # Test larger files
165                print("Testing larger files")
166                large_1k = bytearray(1024)
167                for i, _ in enumerate(large_1k):
168                    large_1k[i] = random.randint(0, 255)
169                client = _write(client, "/random.txt", large_1k)
170                contents = _read(client, "/random.txt")
171                if large_1k != contents:
172                    print(binascii.hexlify(large_1k))
173                    print(binascii.hexlify(contents))
174                    raise RuntimeError("large contents don't match!")
175                print()
176            time.sleep(20)
177    except ConnectionError as e:
178        pass
179
180    print("disconnected, scanning")
181    for advertisement in ble.start_scan(
182        ProvideServicesAdvertisement, Advertisement, timeout=1
183    ):
184        # print(advertisement.address, advertisement.address.type)
185        if (
186            not hasattr(advertisement, "services")
187            or adafruit_ble_file_transfer.FileTransferService
188            not in advertisement.services
189        ):
190            continue
191        ble.connect(advertisement)
192        peer_address = advertisement.address
193        print("connected to", advertisement.address)
194        break
195    ble.stop_scan()