Raspberry Pi Pico WでMicroPythonのasyncioモジュールを使用してサーボモータ等の並列動作を行う実験

,

Raspberry Pi Pico WのプログラムをMicroPythonで作成し、マルチタスク処理が可能なasyncioモジュールを使用して模型用のサーボモータ等を同時並列動作させる実験を紹介します。

1.Raspberry Pi Pico Wに関して

今回サーボモータ等を駆動するコントローラとして、マイコンボード Raspberry Pi Pico W を使用しました。

図1にRaspberry Pi Pico Wの外観と特徴に関して記載します。

図1.Raspberry Pi Pico Wの外観等

2.MicroPythonに関して

Raspberry Pi Pico Wのプログラム言語としてMicroPythonを使用しました。
MicroPythonをRaspberry Pi Pico Wで使用するための手順を以下に説明します。

1.Raspberry Pi Pico WとパソコンをBOOTSELボタンを押しながらUSBケーブルを使用して接続します。Raspberry Pi Pico Wがディスクドライブとして認識されます。

図2.Pico WのBOOTSELボタンとUSBコネクタ

2.エクスプローラで見て、「RPI-RP2」がRaspberry Pi Pico Wのディスクドライブです。
MicroPythonのダウンロードページ(https://MicroPython.org/download/)
から、Raspberry Pi Pico W用のファームウェアをダウンロードします。

今回ダウンロードしたファームウェアはバージョン1.25.0でファイル名称は、
RPI_PICO_W-20250415-v1.25.0.uf2
でした。
このファイルを「RPI-RP2」ディスクにコピーします。これで、Raspberry Pi Pico WでMicroPythonが動作するようになります。


3.MicroPythonのプログラム作成ツールとしてThonnyをパソコンにインストールします。

Thonnyのダウンロードページは、
https://thonny.org/
です。

Raspberry Pi Pico WとパソコンをUSBケーブルで接続して、Thonnyを起動することによって、プログラムの編集や実行を行うことが出来ます。

図3はThonnyの画面です。

図3.Thonny画面

3.MicroPythonの実行手順

MicroPythonで作成したプログラムをRaspberry Pi Pico Wで実行する手順に関して以下で説明します。

1.パソコンでThonnyを起動して、Raspberry Pi Pico WをUSBケーブルで接続します。

2.Thonny画面の右下部をクリックして、「MicroPython (Raspberry Pi Pico)—」を選択します。


3.プログラムを書きます。


4.画面左上のをクリックするとプログラムがRaspberry Pi Pico Wで実行されます。

4.asyncioモジュールに関して

MicroPythonのasyncioはasync, await構文を使用して、非同期処理のコードを書くライブラリです。
非同期処理は複数のタスクが同時に実行(並列実行)される処理です。

【コルーチンに関して】
async def で宣言された関数をコルーチンと呼びます。これは非同期処理できる関数です。
下にプログラム例を示します。

このプログラムの1行目のimport asyncioで、asyncioモジュールをインポートしています。
3行目のasync def main():はコルーチンの宣言です。
5行目のawaitは、別の非同期処理(コルーチン)が終了するのを待ちます。この例ではawait asyncio.sleep(1)で1秒のスリープが完了するまで待ちます。

8行目のasyncio.run(main())でコルーチンを起動しています。

import asyncio

async def main():
  print("Test")
  await asyncio.sleep(1)
  print("End")
  
asyncio.run(main())

5.asyncioモジュールの主な構文、関数等

asyncモジュールで良く使用する構文、関数、クラスを以下に示します。

◎async, await構文
asyncioモジュールを使用したプログラムでは、async, await構文を使用して非同期処理を行います。

項目内容
async関数宣言のdefの前に付けることによって、非同期関数(コルーチン)を定義できます。非同期関数は複数の関数を、ほぼ同時に実行することができる関数です。
(例)
async def main():
print(“hellow”)
await非同期関数内でawaitで指定した他の非同期処理が終了するまで待ちます。
(例)
await asyncio.sleep(1) #1秒待つ

◎関数
asyncioモジュールを使用して非同期処理を行うのに、良く使用する関数を下に示します。

項目内容
asyncio.run(coro)指定した非同期関数(コルーチン)からタスクを作成して実行します。
引数
coro: 非同期関数(コルーチン)
asyncio.create_task(coro)指定したコルーチンからタスクを作成して実行をスケジューリングします。指定したコルーチンはスケジューリングに従って実行されます。
引数
coro: 非同期関数(コルーチン)
asyncio.sleep(t)t秒間スリープします。
これは非同期関数(コルーチン)なので、通常awaitを付けて使用します。
(例)
await asyncio.sleep(1)
asyncio.sleep_ms(t)t msecスリープします。
これは非同期関数(コルーチン)なので、通常awaitを付けて使用します。
(例)
await asyncio.sleep_ms(10)

◎クラスEvent
タスク間の同期に使用するイベントです。

項目内容
class asyncio.Eventタスク間の同期に使うイベントのクラスです。
Event.is_set()イベントがセットされていればTrueを返し、そうでない場合はFalseを返します。
Event.set()イベントをセットします。
Event.clear()イベントをクリアします。
Event.wait()イベントがセットされるのを待ちます。
これは非同期関数(コルーチン)なので、通常awaitを付けて使用します。

◎クラスLock
タスク間の排他処理のために使用するロックです。

項目内容
class asyncio.Lockあるタスク実行中に他のタスクが実行されるのを防ぐことによって、タスク間の調整に使用できるロックのクラスです。
Lock.acquire()他のタスクでのロックが解除状態になるのを待ってからロックします。
これは非同期関数(コルーチン)なので、通常awaitを付けて使用します。

(例)
lock = asyncio.Lock()
・・・・
await lock.acquire()
Lock.release()ロックを解除します。
(例)
lock = asyncio.Lock()
・・・・
await lock.acquire()
#ロックされる部分(この間他のタスク実行無)
lock.release()

6.asyncioモジュールを使用したプログラム例

asyncioモジュールを使用したプログラム例として、複数のLEDの点滅処理と、複数のサーボモータの並列動作処理を示します。

6-1.複数のLEDの点滅処理

4個のタスクを同時に実行して複数のLEDを点滅する例を示します。LED駆動回路は図4を使用しました。

100ms間隔でLEDを点滅させるタスクと、300ms間隔でLEDを点滅させるタスクと、500ms間隔でLEDを点滅させるタスクと、2個のスイッチの値を常時読み込むタスクを同時に実行しています。

プログラムの動作は、スイッチSW1を押すと、それぞれのLEDが点滅する動作が開始され、もう一方のスイッチSW2を押すとLEDの点滅動作が停止します。

スイッチ状態を読み込むタスクと、LEDを点滅させるタスクの情報のやり取りはイベントを使用しています。スイッチ状態を読み込むタスクで、スイッチの状態に応じてイベントをset/clearして、そのイベントのset/clear状態をLEDを点滅させるタスクで読み取って点滅の開始と停止を行っています。

図4.LED点滅テスト回路図
from machine import Pin
import asyncio

#LED ON OFF処理
async def led_on_off(start_event, led, msec):
    print("LEDオンオフスレッドスタート")

    while True:
        if start_event.is_set() == True:	#スタートイベントON?
            led.on()
            await asyncio.sleep_ms(msec)	#msec待ち
            led.off()
        await asyncio.sleep_ms(msec)		#msec待ち

#スイッチ読込処理
async def sw_read(start_event, sw1, sw2):
    while True:
        if sw1.value() == 0:
            start_event.set()		#スタートイベントセット
        elif sw2.value() == 0:
            start_event.clear()		#スタートイベントリセット

        await asyncio.sleep_ms(10)	#10ms待ち

#メイン関数
async def main():
    start_event = asyncio.Event()				#イベント作成
    
    sw1 = Pin(20, Pin.IN, Pin.PULL_UP )			#"GP20をプルアップ付きスイッチ入力とする
    sw2 = Pin(19, Pin.IN, Pin.PULL_UP )			#"GP19をプルアップ付きスイッチ入力とする
    led1 = Pin(18, Pin.OUT)						#"GP18をLED出力とする
    led2 = Pin(17, Pin.OUT)						#"GP17をLED出力とする
    led3 = Pin(16, Pin.OUT)						#"GP16をLED出力とする
 
    task1 = asyncio.create_task(led_on_off(start_event, led1, 100))
    task2 = asyncio.create_task(led_on_off(start_event, led2, 300))
    task3 = asyncio.create_task(led_on_off(start_event, led3, 500))
    tesk4 = asyncio.create_task(sw_read(start_event, sw1, sw2))	#スイッチ読込タスクスタート
    
    await task1
    await task2
    await task3
    await task4
    
#----------メインプログラムスタート---------------------
asyncio.run(main())

【プログラム説明】
2行目:
import asyncio
は、asyncioモジュールを使用するために、asyncioをインポートしています。

5行目:
async def led_on_off(start_event, led, msec):
は、LEDを点滅させる非同期関数(コルーチン)です。defの前にasyncを付けて宣言しています。

9行目:
if start_event.is_set() == True:
は、イベントstart_eventがセットされているか確認します。セットされている場合は、LEDの点滅を開始します。
start_eventのセットは、スイッチの動作を確認するsw_read()非同期関数内で行われています。

18行目、19行目:
if sw1.value() == 0:
start_event.set()
は、スイッチSW1の状態を読み込んで、押されている時にイベントstart_eventをセットしています。

27行目:
start_event = asyncio.Event()
は、イベントstart_eventを作成しています。このイベントは、sw_read()非同期関数内でスイッチの状態を判断して、押されている場合にセットしています。それぞれのLED点滅タスク内で、このイベントを判断してセット状態の時にLEDの点滅処理を行っています。

35行目~38行目:
task1 = asyncio.create_task(led_on_off(start_event, led1, 100))
task2 = asyncio.create_task(led_on_off(start_event, led2, 300))
task3 = asyncio.create_task(led_on_off(start_event, led3, 500))
tesk4 = asyncio.create_task(sw_read(start_event, sw1, sw2))
は、LED1を100msec毎に点滅するタスクと、LED2を300msec毎に点滅するタスクと、LED3を500msec毎に点滅するタスクと、スイッチを読込判断するタスクの4つのタスクを起動しています。これらのタスクは並列に実行されます。

上記プログラム実行時の動画を以下に示します。

6-2.複数サーボモータの並列動作処理

◎サーボモータに関して

図5は、今回の実験で使用したサーボモータTower ProのMG996Rです。

図5.サーボモータ外観

サーボモータはPWM信号で制御します。サーボモータから出ている線の、橙線がPWM信号で、赤線が電源線(4.8V~7.2V)で、茶線がGND線です。
PWM信号は、図6の様にパルスのデューティ比が変化する信号です。

図6.PWM信号に関して



サーボモータは、PWM信号のデューティ比を変更することによって、軸の角度が変化します。

また、PWM信号の周波数は50Hzです。50Hzの周期は20msなので20ms内のパルスの幅によってサーボモータの軸の角度が変わります。
例えば図7は、サーボモータのPWM信号波形の例ですが、パルス幅1ms(デューティ比5%)の場合は軸の角度が0°、パルス幅2ms(デューティ比10%)の時は軸の角度が180°となります。

図7.サーボモータのPWM信号例

MG996Rサーボモータの資料には、パルス幅1msで-90°、1.5msで0°、2msで+90°と書いてありますが、そのパルス幅の範囲では全部で90°程度しか動作しませんでした。

パルス幅0.5ms(デューティ比2.5%)~2.4ms(デューティ比12%)とした場合にトータル180°動作したのでその値を使用することにしました。

パルス幅0.5ms (デューティ比2.5%)時に-90°、パルス幅1.5ms(デューティ比7.5%)時に0°、パルス幅2.4ms(デューティ比12%)時に90°の位置に回転しました。また、回転方向は-90°が時計方向、90°が反時計方向です。

◎サーボモータを動作させる回路

今回サーボモータを動作させるのに使用した回路を図8に示します。Raspberry Pi Pico WのI/OのGP0にサーボモータNo.1を接続し、GP1にサーボモータNo.2を接続しています。

また、サーボモータ動作開始用のスイッチをGP16に接続しています。

サーボモータ用の5V電源は、電流が大きいのでRaspberry Pi Pico Wの電源とは別に設けました。

図8.サーボモータ動作回路

◎複数のサーボモータの並列動作プログラム

以下にサーボモータ2個をマルチタスクで同時に動作させるプログラムを示します。
2個のタスクで、それぞれのサーボモータを独立に動作させています。部分的にイベントを使用して、2個のサーボモータの動作に同期を取っています。

プログラムの動作は、
押し釦スイッチを押すと、サーボモータNo.1は、90°→-90°→0°→75°→-75°→イベントセット→0°の順に動作します。
サーボモータNo.2は、-90°→90°→0°→-30°→0°→イベントセット待ち→75°→0°の順に動作します。

サーボモータNo.1側は-75°に到達した時点で、イベントをセットしています。サーボモータNo.2側は2回目の0°に到達した時点でイベントセット待ちをしていますので、サーボモータNo.1が-75°に到達した後に、0°から75°に回転開始します。

from machine import PWM, Pin
import asyncio

sw1 = Pin(16, Pin.IN, Pin.PULL_UP)	#GP16をプルアップ付きスイッチ入力とする

#サーボモータ用PWMピンの設定
servo1 = PWM(Pin(0), freq = 50)	#GP0がサーボモータNo.1
servo2 = PWM(Pin(1), freq = 50)	#GP1がサーボモータNo.2

plu90_duty = float(2.4 / 20.0)	#90deg時のデユーティ比
min90_duty = float(0.5 / 20.0)	#-90deg時のデユーティ比

#指定角度にサーボモータを動かす関数
#パラメータ
#deg: 移動角度 -90~90
#no:	サーボモータNo(1~2)
def sv_mov(deg, no):
    global servo1, servo2
    
    duty = int(65535 * (min90_duty + (plu90_duty - min90_duty) * (deg + 90) /180))
    if no == 1:
        servo1.duty_u16(duty)
    elif no == 2:
        servo2.duty_u16(duty) 

async def servo_mov1(event):
    sv_mov(90, 1)					#90°に回転
    await asyncio.sleep(1)			#1sec待ち
    sv_mov(-90, 1)					#-90°に回転
    await asyncio.sleep(1)			#1sec待ち
    sv_mov(0, 1)					#0°に回転
    await asyncio.sleep(1)			#1sec待ち
    sv_mov(75, 1)					#75°に回転
    await asyncio.sleep(0.5)		#0.5sec待ち
    sv_mov(-75, 1)					#-75°に回転
    await asyncio.sleep(1)			#1sec待ち
    
    event.set()						#イベントセット

    sv_mov(0, 1)					#0°に回転
    await asyncio.sleep(0.5)		#0.5sec待ち
    
async def servo_mov2(event):
    sv_mov(-90, 2)					#-90°に回転
    await asyncio.sleep(1)			#1sec待ち
    sv_mov(90, 2)					#-90°に回転
    await asyncio.sleep(1)			#1sec待ち
    sv_mov(0, 2)					#0°に回転
    await asyncio.sleep(1)			#1sec待ち 
    sv_mov(-30, 2)					#-30°に回転
    await asyncio.sleep(0.5)		#0.5sec待ち
    sv_mov(0, 2)					#0°に回転
    await asyncio.sleep(0.5)		#0.5sec待ち
    
    await event.wait()				#イベントセット待ち
    event.clear()
    
    sv_mov(75, 2)					#75°に回転
    await asyncio.sleep(0.5)		#0.5sec待ち
    sv_mov(0, 2)					#0°に回転
    await asyncio.sleep(0.5)		#0.5sec待ち
     

async def main():
    event = asyncio.Event()			#イベント作成
    
    while True:
        if sw1.value() == 0:		#スイッチON?
            task1 = asyncio.create_task(servo_mov1(event))
            task2 = asyncio.create_task(servo_mov2(event))
            await task1
            await task2
        await asyncio.sleep_ms(10)	#10msec待ち

#----------メインプログラム開始-------------------
asyncio.run(main())

【プログラム説明】
2行目:
import asyncio
は、asyncioをインポートしてasyncioモジュールを使用可能としています。

4行目:
sw1 = Pin(16, Pin.IN, Pin.PULL_UP)
は、Raspberry Pi Pico WのI/OのGP16に、押し釦スイッチを接続しているので、GP16をプルアップ付きの入力に設定しています。
押し釦スイッチの片側は0Vに接続されていますので、押し釦スイッチを押した時の入力は「0」となります。

7行目、8行目:
servo1 = PWM(Pin(0), freq = 50) #GP0がサーボモータNo.1
servo2 = PWM(Pin(1), freq = 50) #GP1がサーボモータNo.2
は、Raspberry Pi Pico WのI/OのGP0とGP1にサーボモータのPWM信号を割り当てています。PWM周波数50Hzに設定しています。

10行目,11行目:
plu90_duty = float(2.4 / 20.0) #90deg時のデユーティ比
min90_duty = float(0.5 / 20.0) #-90deg時のデユーティ比
は、サーボモータ軸が90°の時のPWMのパルス幅を2.4ms、-90°の時のPWMのパルス幅を0.5msとして、それぞれのデューティ比を計算しています。割り算の分母の20はPWM周波数50Hzの周期20msです。

17行目:
def sv_mov(deg, no):
は、サーボモータを回転させる関数です。引数のdegは回転角度、noはサーボモータ番号で、1がサーボモータNo.1で、2がサーボモータNo.2です。

20行目:
duty = int(65535 * (min90_duty + (plu90_duty – min90_duty) * (deg + 90) /180))
は、サーボモータに指令するPWM信号のデューティ比の計算です。

計算したPWM信号のデューティ比は、servo.duty_u16(duty)関数のdutyで指定し、サーボモータを動作させるのに使用されます。
dutyは100%を65535とする数値(int)で指定します。例えば20%の時は、65535 * 20/ 100 = 13107 となります。
ここでは、+90°の時のPWMのデューティ比(plu90_duty)と-90°の時のデューティ比(min90_duty)から指定角度に対するデューティ比を比例計算しています。

22行目、24行目:
servo1.duty_u16(duty)と、servo2.duty_u16(duty)
は、duty_16(duty)関数を使用して、PWMのデューティ比を指定することによってサーボモータを指定角度に回転させています。デューティ比は100%を65535とする数値(int)で指定します。例えば20%の時は、65535 * 20/ 100 = 13107 となります。

38行目:
event.set()
サーボモータNo.1が-75°に到達したことを、サーボモータNo.2を駆動しているタスクに知らせるためにイベントをセットしています。

55行目:
await event.wait()
サーボモータNo.2を0°に回転させた後に、イベントがセットされるのを待っています。イベントはサーボモータNo.1を駆動するタスクで、軸の角度が-75°に到達した時にセットされます。
イベントがセットされると56行目以降が処理され、サーボモータNo.2は75°に回転します。

65行目:
event = asyncio.Event()
は、サーボモータを駆動する2個のタスク間の同期を取るためのイベントを作成しています。

69行目、70行目
task1 = asyncio.create_task(servo_mov1(event))
task2 = asyncio.create_task(servo_mov2(event))
は、servo_mov1()とservo_mov2()の2個のタスクを起動しています。それぞれのタスクはそれぞれ1個のサーボモータを回転する処理を行います。

上記プログラム実行時の動画を以下に示します。


PAGE TOP