from devices import WikoHi70m import apps import random import threading def main(): try: # adb_cmd.mobile_unlock() # 3URNU24803102309 192.168.8.138:5555 wikoHi70m = WikoHi70m('3URNU24803102309') # 普通任务 watchAdTask = apps.Task("看广告", apps.Rule.count(500, 45, 2), lambda : apps.TomatoListen(wikoHi70m).func_ad()) watchVideoTask = apps.Task.watch_video("刷视频", apps.Rule.count(500, 8, 2), wikoHi70m) readBookTask = apps.Task.read_book("看书", apps.Rule.count(1000, 5, 2), wikoHi70m) browseTask = apps.Task.browse("逛街", apps.Rule.time(60, 5, 2), wikoHi70m) # 窗口任务 bgWatchVideoTask = apps.Task("后台刷视频", apps.Rule.count(500, 10, 2), lambda : wikoHi70m.swipe(240 + int(random.uniform(-10, 10)), 1200 + int(random.uniform(-10, 10)), 240 + int(random.uniform(-10, 10)), 1100 + int(random.uniform(-10, 10)), 50)) bgReadBookTask = apps.Task("后台看书", apps.Rule.count(1000, 5, 2), lambda : wikoHi70m.swipe(500 + int(random.uniform(-10, 10)), 1200 + int(random.uniform(-10, 10)), 240 + int(random.uniform(-10, 10)), 1200 + int(random.uniform(-10, 10)), 80)) minWatchVideoTask = apps.Task("小窗刷视频", apps.Rule.count(500, 20, 2), lambda : wikoHi70m.swipe(450 + int(random.uniform(-10, 10)), 400 + int(random.uniform(-10, 10)), 450 + int(random.uniform(-10, 10)), 200 + int(random.uniform(-10, 10)), 50)) minReadBookTask = apps.Task("小窗看书", apps.Rule.count(1000, 5, 2), lambda : wikoHi70m.swipe(600 + int(random.uniform(-10, 10)), 400 + int(random.uniform(-10, 10)), 400 + int(random.uniform(-10, 10)), 400 + int(random.uniform(-10, 10)), 80)) # 分屏任务 topTask = apps.Task("刷视频", apps.Rule.count(500, 8, 2), lambda : wikoHi70m.swipe(350 + int(random.uniform(-10, 10)), 1250 + int(random.uniform(-10, 10)), 350 - int(random.uniform(10, 20)), 900 + int(random.uniform(-10, 10)), 20)) bottomTask = apps.Task("看书", apps.Rule.count(500, 5, 2), lambda : wikoHi70m.swipe(600 + int(random.uniform(-10, 10)), 350 + int(random.uniform(-10, 10)), 400 + int(random.uniform(-10, 10)), 350 + int(random.uniform(-10, 10)), 60)) # watchVideoTask.start() browseTask.start() # 看视频+小窗看书,同步执行 # sync_task(topTask, bottomTask) # 任务完成锁屏 wikoHi70m.lock() except KeyboardInterrupt: print("程序被用户中断") # 多任务同步执行 def sync_task(task1, task2): th1 = threading.Thread(target = lambda :task1.start()) th2 = threading.Thread(target = lambda :task2.start()) th1.start() th2.start() th1.join() th2.join() if __name__ == "__main__": main()