import subprocess import re import threading lock = threading.Lock() # 获取连接的设备 def device_get(): cmd = "adb devices" result = subprocess.run(cmd.split(" "), capture_output=True, text=True) stdout = result.stdout # print("------ stdout --------") # print(stdout) # print("------ stdout --------") devices = [] for line in stdout.split("\n")[1:]: line = line.strip() if line != "": match = re.match(r"(\S+)", line) devices.append(match.group(1)) return devices # 连接远程设备 def device_connect(ip, port = 5555): cmd = "adb connect {}:{}".format(ip, port) result = subprocess.run(cmd.split(" "), capture_output=True, text=True) stdout = result.stdout # print(stdout) return stdout.startswith("connected to") # 断开设备 def device_disconnect(ip, port = 5555): cmd = "adb disconnect {}:{}".format(ip, port) result = subprocess.run(cmd.split(" "), capture_output=True, text=True) stdout = result.stdout # print(stdout) return stdout.startswith("disconnected") class Operator: def __init__(self, id): self.id = id def home(self): cmd = "adb -s {} shell input keyevent 3".format(self.id) self._run(cmd) def back(self): cmd = "adb -s {} shell input keyevent 4".format(self.id) self._run(cmd) def power(self): cmd = "adb -s {} shell input keyevent 26".format(self.id) self._run(cmd) def touch(self, x, y): cmd = "adb -s {} shell input tap {} {}".format(self.id, x, y) self._run(cmd) def swipe(self, startX, startY, endX, endY, time): cmd = "adb -s {} shell input swipe {} {} {} {} {}".format(self.id, startX, startY, endX, endY, time) self._run(cmd) def _run(self, cmd): with lock: subprocess.run(cmd.split(" "))