教程里是购买的淘宝上集成好的小车器件。另外自己也买了一些散件配置了一辆自己的小车。
https://detail.tmall.com/item.htm?id=608554421638&spm=a1z09.2.0.0.7e012e8d3NEMy0&_u=12kf16b6b4b
组件包括:
1、小车底板(2个)、电机(4个)、车轮(4个)、杜邦线、铜柱、螺丝若干
2、超声波传感器(1个) + 舵机(1个)
3、循迹传感器(3个)
4、避障传感器(2个,左右)
5、USB摄像头(1个) + 舵机(2个)
6、树莓派4B主控板(2G版本)(1个)
7、树莓派扩展板(1个)
8、电池(1个)
9、电压显示模块(1个)
我最终购买一套花了620元,如果按照散件购买,价格估算如下:
1、小车底板一套:35元
2、超声波+1个舵机+小部件:30元
3、循迹传感器:15元
4、避障传感器:5元
5、USB摄像头+2个舵机:50元
6、树莓派扩展板(2G版本):320元
7、电池:20元
8、电压显示模块:5元
9、树莓派扩展板:140元
10、电子教程一套:0元
共计:620元。
也就是说这家店铺等于赚了一个卖散件和自家工厂做的扩展板的利润,没赚取集成费用。
建议最经济的做法买套餐B,树莓派不是什么USB摄像头都能兼容的,所以需要买个B套餐摄像头,对于C和D,完全没必要,其它的传感器完全可以以后买。另外如果还想省钱,可以买不带树莓派4B的主控板,买一块树莓派ZeroWH的主控板,可以省200元,用于小车实现性能足够了。
树莓派集成扩展板其实内部集成了两块L298N的电机驱动芯片,参考散件其工作原理如下:
简单点就是ENA和ENB是控制车轮速度,IN1、IN2是控制车轮状态(正转、反转、停止)。上图输出A和输出B是分别给车轮电机供电。
#CarMotor.py import RPi.GPIO as GPIO import asyncio class SingleMotor: def __init__(self, IN1, IN2, PWM=None): self.speed = 35 self.freq = 50 self.run_state = "stop" self.PWM = PWM self.IN1 = IN1 self.IN2 = IN2 GPIO.setup(IN1, GPIO.OUT) GPIO.setup(IN2, GPIO.OUT) self.Motor = None if self.PWM != None: GPIO.setup(PWM, GPIO.OUT) self.Motor = GPIO.PWM(PWM, self.freq) self.Motor.start(0) #设置速度(0-100) def set_speed(self, speed): if self.Motor == None: return if speed > 100: self.speed = 100 elif speed < 0: self.speed = 0 else: self.speed = speed self.Motor.ChangeDutyCycle(self.speed) def get_speed(self): return self.speed def inc_speed(self, v): self.set_speed(self.speed + v) def dec_speed(self, v): self.set_speed(self.speed - v) def get_run_state(self): return self.run_state #正转 def up(self): self.run_state = "up" GPIO.output(self.IN1, GPIO.HIGH) GPIO.output(self.IN2, GPIO.LOW) #反转 def down(self): self.run_state = "down" GPIO.output(self.IN1, GPIO.LOW) GPIO.output(self.IN2, GPIO.HIGH) #停止 def stop(self): self.run_state = "stop" GPIO.output(self.IN1, GPIO.LOW) GPIO.output(self.IN2, GPIO.LOW) class CarWheel: def __init__(self, L_Motor, R_Motor): self.L_Wheel = L_Motor self.R_Wheel = R_Motor self.speed = 35 self.set_speed(self.speed) self.run_state = "stop" def set_speed(self, speed): if speed > 100: self.speed = 100 elif speed < 0: self.speed = 0 else: self.speed = speed self.L_Wheel.set_speed(self.speed) self.R_Wheel.set_speed(self.speed) def get_speed(self): return self.speed def inc_speed(self, v): self.L_Wheel.inc_speed(v) self.R_Wheel.inc_speed(v) self.speed = self.L_Wheel.get_speed() def dec_speed(self, v): self.L_Wheel.dec_speed(v) self.R_Wheel.dec_speed(v) self.speed = self.L_Wheel.get_speed() def get_run_state(self): return self.run_state def up(self): self.run_state = "up" self.L_Wheel.up() self.R_Wheel.up() def stop(self): self.run_state = "stop" self.L_Wheel.stop() self.R_Wheel.stop() def down(self): self.run_state = "down" self.L_Wheel.down() self.R_Wheel.down() #小车左转,控制左轮不动,右轮前进即可 def left(self): self.run_state = "left" self.L_Wheel.stop() self.R_Wheel.up() #小车右转,控制右轮不动,左轮前进即可 def right(self): self.run_state = "right" self.L_Wheel.up() self.R_Wheel.stop() async def motor_task(): L_Motor = SingleMotor( PWM = 18, IN1 = 22, IN2 = 27 ) R_Motor = SingleMotor( PWM = 23, IN1 = 25, IN2 = 24 ) cm = CarWheel( L_Motor = L_Motor, R_Motor = R_Motor ) t_time = 2 cm.inc_speed(5) cm.up() await asyncio.sleep(t_time) cm.dec_speed(5) cm.down() await asyncio.sleep(t_time) cm.left() await asyncio.sleep(t_time) cm.right() await asyncio.sleep(t_time) cm.stop() await asyncio.sleep(t_time) async def main(): dltasks = set() dltasks.add(asyncio.ensure_future(motor_task())) dones, dltasks = await asyncio.wait(dltasks) for task in dones: print("Task ret: ", task.result()) if __name__ == '__main__': GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) loop = asyncio.get_event_loop() loop.run_until_complete(main()) try: loop.run_forever() except KeyboardInterrupt: pass finally: GPIO.cleanup()
#https://gpiozero.readthedocs.io/en/stable/recipes.html#distance-sensor #DistanceSensor_t.py from gpiozero import DistanceSensor from time import sleep sensor = DistanceSensor(23, 24) while True: print('Distance to nearest object is', sensor.distance, 'm') sleep(1)
#joystick_t.py import asyncio import pygame from pygame.locals import * async def joystick_task(self): #JOYAXISMOTION joy, axis, value #JOYBALLMOTION joy, ball, rel #JOYHATMOTION joy, hat, value #JOYBUTTONUP joy, button #JOYBUTTONDOWN joy, button while True: for event in pygame.event.get(): # if event.type == pygame.JOYAXISMOTION: # print("Joystick JOYAXISMOTION.") # if event.type == pygame.JOYBALLMOTION: # print("Joystick JOYBALLMOTION.") if event.type == pygame.JOYHATMOTION: # print("Joystick JOYHATMOTION.") if event.value == (-1, 0): print('left') elif event.value == (0, 1): print('up') elif event.value == (0, -1): print('down') elif event.value == (1, 0): print('right') elif event.value == (0, 0): print('stop') else: print(event.value) elif event.type == pygame.JOYBUTTONDOWN: print("Joystick JOYBUTTONDOWN.") if joystick.get_button(0) == 1: print('button[Y]') elif joystick.get_button(1) == 1: print('button[B]') elif joystick.get_button(2) == 1: print('button[A]') elif joystick.get_button(3) == 1: print('button[X]') else: pass elif event.type == pygame.JOYBUTTONUP: print("Joystick JOYBUTTONUP.") else: pass await asyncio.sleep(0.1) async def main(): pygame.init() pygame.joystick.init() # Get count of joysticks joystick_count = pygame.joystick.get_count() print("Number of joysticks: {0}".format(joystick_count)) joystick0 = pygame.joystick.Joystick(0) joystick.init() dltasks = set() dltasks.add(asyncio.ensure_future(joystick_task())) dones, dltasks = await asyncio.wait(dltasks) for task in dones: print("Task ret: ", task.result()) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main()) try: loop.run_forever() except KeyboardInterrupt: pass finally: pygame.quit()
#见主程序
#CarServo.py import Adafruit_PCA9685 import RPi.GPIO as GPIO import time class CarServo: def __init__(self): #2个摄像头舵机,1个超声波舵机 self.pwm_pca9685 = Adafruit_PCA9685.PCA9685() self.pwm_pca9685.set_pwm_freq(50) self.servo = {} self.set_servo_angle(0, 110) self.set_servo_angle(1, 100) self.set_servo_angle(2, 20) #输入角度转换成12^精度的数值 def set_servo_angle(self, channel, angle): if (channel >= 0) and (channel <= 2): new_angle = angle if angle < 0: new_angle = 0 elif angle > 180: new_angle = 180 else: new_angle = angle print("channel={0}, angle={1}".format(channel, new_angle)) #date=4096*((new_angle*11)+500)/20000#进行四舍五入运算 date=int(4096*((angle*11)+500)/(20000)+0.5) date = int(4096*((new_angle*11)+500)/(20000)+0.5) self.pwm_pca9685.set_pwm(channel, 0, date) self.servo[channel] = new_angle else: print("set_servo_angle error. servo[{0}] = [{1}]".format(channel, angle)) def inc_servo_angle(self, channel, v): self.set_servo_angle(channel, self.servo[channel] + v) def dec_servo_angle(self, channel, v): self.set_servo_angle(channel, self.servo[channel] - v) if __name__ == '__main__': GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) cs = CarServo() cs.inc_servo_angle(0, 10) cs.inc_servo_angle(1, 10) cs.inc_servo_angle(2, 10) time.sleep(2) cs.dec_servo_angle(0, 10) cs.dec_servo_angle(1, 10) cs.dec_servo_angle(2, 10) time.sleep(2) GPIO.cleanup()
#见主程序
import RPi.GPIO as GPIO import time pin_avoid_obstacle=18 GPIO.setmode(GPIO.BCM) GPIO.setup(pin_avoid_obstacle, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) try: while True: status = GPIO.input(pin_avoid_obstacle) if status == TRUE: print('我是红外避障模组,没有检测到障碍物,一切正常!') else: print('我是红外避障模组,检测到障碍物,注意停车') time.sleep(0.5) except KeyboradInterrupt: pass finally: GPIO.cleanup()
#家里地下暂时不想贴黑线,所有没写这块代码
#ppycar.py #!/usr/bin/python3 # -*- coding: utf-8 -*- import asyncio import aiohttp import websockets import json import time import logging import threading import sys import cv2 from aiohttp import web from concurrent.futures import ThreadPoolExecutor import RPi.GPIO as GPIO import pygame from pygame.locals import * from CarMotor import CarWheel, SingleMotor from gpiozero import DistanceSensor #from CarDistance import CarDistance from CarServo import CarServo class CwCar: def __init__(self): #左边前轮和后轮(电路确保行动一致) L_Motor = SingleMotor( PWM = 18, IN1 = 22, IN2 = 27 ) #右边前轮和后轮(电路确保行动一致) R_Motor = SingleMotor( PWM = 23, IN1 = 25, IN2 = 24 ) #小车 self.cm = CarWheel( L_Motor = L_Motor, R_Motor = R_Motor ) #超声波测距 #self.cd = CarDistance( # GPIO_TRIGGER = 20, # GPIO_ECHO = 21 # ) self.sensor = DistanceSensor(21, 20) #2个摄像头舵机,1个超声波舵机 self.cs = CarServo() #线程池,用于封装非协程对象 self.__executor = ThreadPoolExecutor(10) self.__loop = asyncio.get_event_loop() self.cap = cv2.VideoCapture(0) self.jpg_bytes = None #非协程对象的阻塞函数调用封装 async def ThreadRun(self, func, *args,**kwargs): return await self.__loop.run_in_executor(self.__executor, func, *args,**kwargs) #定期更新超声波所测的障碍物距离 async def distance_task(self, update_time): print("distance_task start") while True: dist_float = self.sensor.distance dist = int(dist_float * 100) #当距离不到40cm的时候且运行状态为前进,则小车停止运行 if (dist < 80) and (self.cm.get_run_state() == 'up'): self.cm.stop() await asyncio.sleep(update_time) print("distance_task end") #手柄指令控制小车运行 async def joystick_task(self): joystick = pygame.joystick.Joystick(0) joystick.init() while True: for event in pygame.event.get(): #手柄左边的上下左右键,控制小车运行 if event.type == pygame.JOYHATMOTION: if event.value == (-1, 0): print('left') self.cm.left() elif event.value == (0, 1): print('up') dist_float = self.sensor.distance dist = int(dist_float * 100) if dist > 80: self.cm.up() elif event.value == (0, -1): print('down') self.cm.down() elif event.value == (1, 0): print('right') self.cm.right() elif event.value == (0, 0): print('stop') self.cm.stop() else: print(event.value) #控制小车速度 elif event.type == pygame.JOYBUTTONDOWN: print("Joystick JOYBUTTONDOWN.") if joystick.get_button(0) == 1: print('button[Y]') elif joystick.get_button(1) == 1: print('button[B]') self.cm.inc_speed(5) elif joystick.get_button(2) == 1: print('button[A]') elif joystick.get_button(3) == 1: print('button[X]') self.cm.dec_speed(5) #控制摄像头舵机 elif joystick.get_button(4) == 1: print('button[4]') self.cs.inc_servo_angle(1, 5) elif joystick.get_button(5) == 1: print('button[5]') self.cs.dec_servo_angle(1, 5) elif joystick.get_button(6) == 1: print('button[6]') self.cs.inc_servo_angle(2, 5) elif joystick.get_button(7) == 1: print('button[7]') self.cs.dec_servo_angle(2, 5) else: pass await asyncio.sleep(0.1) #接收远程网页上发送的小车控制指令 async def car_recv_msg(self, ws, msg): response_ojb = { "cmd" : "None", "ret" : 0, "errinfo": "" } try: recv_text = msg.data recv_json_obj = json.loads(recv_text) print(recv_json_obj) cmd = recv_json_obj['cmd'] response_ojb['cmd'] = cmd print("cmd = {0}".format(cmd)) if cmd == "stop": self.cm.stop() elif cmd == "up": dist = int(await self.ThreadRun(self.cd.distance)) if dist > 80: self.cm.up() elif cmd == "down": self.cm.down() elif cmd == "left": self.cm.left() elif cmd == "right": self.cm.right() elif cmd == "inc": self.cm.inc_speed(5) speed = self.cm.get_speed() response_ojb["speed"] = speed elif cmd == "dec": self.cm.dec_speed(5) speed = self.cm.get_speed() response_ojb["speed"] = speed else: print("cmd error") except Exception as e: print("recv json error:{0}:{1}:{2}" .format( e.__traceback__.tb_frame.f_globals["__file__"], e.__traceback__.tb_lineno, e)) response_ojb["ret"] = 1 response_ojb["errorinfo"] = "recv json error" response_json = json.dumps(response_ojb) await ws.send_str(response_json) #websocket接收消息协议解析 async def websocket_car_handler(self, request): print('websocket_car_handler start...') ws = web.WebSocketResponse() await ws.prepare(request) while not ws.closed: msg = await ws.receive() if msg.type == aiohttp.WSMsgType.text: await self.car_recv_msg(ws, msg) elif msg.type == aiohttp.WSMsgType.close: print('websocket connection closed') elif msg.type == aiohttp.WSMsgType.error: print('ws connection closed with exception {0}'.format(ws.exception())) else: await ws.send_str('websocket_car_handler') return ws #返回摄像头图像数据给请求网页 async def websocket_webcam_handler(self, request): print('websocket_webcam_handler start...') ws = web.WebSocketResponse() await ws.prepare(request) while not ws.closed: msg = await ws.receive() if msg.type == aiohttp.WSMsgType.text: print(msg.data, end='') if self.jpg_bytes != None: await ws.send_bytes(self.jpg_bytes) elif msg.type == aiohttp.WSMsgType.close: print('websocket connection closed') elif msg.type == aiohttp.WSMsgType.error: print('ws connection closed with exception {0}'.format(ws.exception())) else: await ws.send_str('websocket_webcam_handler') return ws #定时采集摄像头图像数据 async def webcam_read_task(self): print("{0}:{1}".format("webcam_read_task", "start")) encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 50] while True: await self.ThreadRun(self.__webcam_read, encode_param) await asyncio.sleep(0.05) print("{0}:{1}".format("webcam_read_task", "end")) #摄像头图像数据采集 def __webcam_read(self, encode_param): ret, frame = self.cap.read() ret, jpg = cv2.imencode('.jpg', frame, encode_param) self.jpg_bytes = jpg.tobytes() async def main(main_loop): cc = CwCar() local_ip = '0.0.0.0' local_port = 5678 app = web.Application() app.router.add_route('GET', '/ppycar', cc.websocket_car_handler) app.router.add_route('GET', '/sendframe', cc.websocket_webcam_handler) app.router.add_static('/static', path='webroot', show_index=True) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, local_ip, local_port) print("MainLoop:Server started at http://{0}...:{1}".format(local_ip, local_port)) dltasks = set() dltasks.add(asyncio.ensure_future(cc.joystick_task())) #手柄控制 dltasks.add(asyncio.ensure_future(site.start())) #websockets网页控制 dltasks.add(asyncio.ensure_future(cc.webcam_read_task())) #定时摄像头图像数据采集 dltasks.add(asyncio.ensure_future(cc.distance_task(update_time = 0.05)))#定时超声波测距 dones, dltasks = await asyncio.wait(dltasks) for task in dones: print("Main Task ret: ", task.result()) if __name__ == '__main__': pygame.init() pygame.joystick.init() joystick_count = pygame.joystick.get_count() print("Number of joysticks: {0}".format(joystick_count)) #确保有且只有一个手柄控制 if joystick_count != 1: print("joystick_count < 1, sys.exit.") GPIO.cleanup() pygame.quit() sys.exit() GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) main_loop = asyncio.get_event_loop() main_loop.run_until_complete(main(main_loop)) try: main_loop.run_forever() except KeyboardInterrupt: pass finally: GPIO.cleanup() pygame.quit()
ppy_car_web.html
<!DOCTYPE html> <html lang="zh-cmn-Hans"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width,initial-scale=1,user-scalable=0,viewport-fit=cover"> <title>小车远程控制</title> <link rel="stylesheet" href="https://res.wx.qq.com/open/libs/weui/2.4.4/weui.css"/> <script type="text/javascript"> let ws; let wsurl = "ws://" + window.location.hostname + ":5678/ppycar" //let wsurl = "ws://192.168.3.128:5678/ppycar" let speed = 35; function car_cmd(car_cmd_json){ let jsonText = JSON.stringify(car_cmd_json); console.log(jsonText); if (ws.readyState == 1){ ws.send(jsonText); }else{ console.log("ws.readyState = " + ws.readyState) } } function car_speed(cmd){ car_cmd({cmd: cmd}) } function car_move(cmd){ car_cmd({cmd: cmd}) } function car_distance(){ car_cmd({cmd: 'distance'}) } function ws_conn(){ ws = new WebSocket(wsurl); console.log("ws_conn."); ws.onopen = function() { console.log("do_ws_open."); }; ws.onmessage = function (evt) { let received_msg = evt.data; console.log("recv info:" + received_msg); let obj = JSON.parse(received_msg) if((obj.cmd == 'inc') || (obj.cmd == 'dec')){ document.getElementById("car_speed").innerHTML = obj.speed + "(区间:0-100)"; } }; ws.onclose = function() { console.log("ws close."); }; } function ws_close(){ console.log("do_ws_close."); ws.close(); } </script> <script> let webcam_wsurl = "ws://" + window.location.hostname + ":5678/sendframe" let socket = new WebSocket(webcam_wsurl); let img = new Image(); function sendMsg() { socket.send("update"); console.log("socket: send update"); } function Uint8ToString(u8a) { var CHUNK_SZ = 0x8000; var c = []; for (var i = 0; i < u8a.length; i += CHUNK_SZ) { c.push(String.fromCharCode(...u8a.subarray(i, i + CHUNK_SZ))); } return c.join(""); } function drawFrame(frame) { var uint8Arr = new Uint8Array(frame); var str = Uint8ToString(uint8Arr); var base64String = btoa(str); img.onload = function () { context.drawImage(this, 0, 0, canvas.width, canvas.height); }; img.src = "data:image/png;base64," + base64String; } socket.onopen = () => { console.log("socket: connected"); }; socket.onmessage = (msg) => { msg.data.arrayBuffer().then((buffer) => { drawFrame(buffer); console.log("socket: frame updated"); }); }; </script> </head> <body onl oad="ws_conn();"> <div class="page"> <div class="weui-form"> <div class="weui-form__text-area"> <h2 class="weui-form__title">小车远程控制</h2> </div> <div class="weui-cells weui-cells_form"> <div class="weui-cell"> <div class="weui-cell__bd"> </div> <div class="weui-cell__bd"> <button class="weui-btn weui-btn_primary" onclick="car_move('up');" id="car_up">前进</button> </div> <div class="weui-cell__bd"> </div> </div> <div class="weui-cell"> <div class="weui-cell__bd"> <button class="weui-btn weui-btn_primary" onclick="car_move('left');" id="car_left">左转</button> </div> <div class="weui-cell__bd"> <button class="weui-btn weui-btn_primary" onclick="car_move('down');" id="car_down">后退</button> </div> <div class="weui-cell__bd"> <button class="weui-btn weui-btn_primary" onclick="car_move('right');" id="car_right">右转</button> </div> </div> <div class="weui-cell"> <div class="weui-cell__bd"> <button class="weui-btn weui-btn_primary" onclick="car_speed('dec');" id="speed_dec">减速</button> </div> <div class="weui-cell__bd"> <button class="weui-btn weui-btn_primary" onclick="car_speed('inc');" id="speed_inc">加速</button> </div> <div class="weui-cell__bd"> <button class="weui-btn weui-btn_primary" onclick="car_move('stop');" id="cat_stop">停止</button> </div> </div> </div> <div class="weui-form__text-area"> <h2 class="weui-form__title">小车数据采集</h2> </div> <div class="weui-cells"> <div class="weui-cell weui-cell_example"> <div class="weui-cell__hd"><i class="weui-icon-waiting weui-icon_msg"></i></div> <div class="weui-cell__bd"> <p>小车速度</p> </div> <div class="weui-cell__ft" id="car_speed">35(0-100)</div> </div> <div class="weui-cell weui-cell_example"> <div class="weui-cell__hd"><i class="weui-icon-waiting weui-icon_msg"></i></div> <div class="weui-cell__bd"> <p>超声波距离</p> </div> <div class="weui-cell__ft">0(毫米)</div> </div> </div> <div class="weui-form__text-area"> <h2 class="weui-form__title">小车摄像头</h2> </div> <div class="weui-cells" align="center"> <canvas id="canvas-video" width="512" height="384"></canvas> </div> <script> const canvas = document.getElementById("canvas-video"); const context = canvas.getContext("2d"); // show loading notice context.fillStyle = "#333"; context.fillText("Loading...", canvas.width / 2 - 30, canvas.height / 3); setInterval(() => { socket.send("x"); }, 100); </script> </div> </div> </body> </html>