优化-抽出执行任务

This commit is contained in:
wangzhengzhen
2024-12-30 17:20:05 +08:00
parent d0e86cb4e2
commit 505e803142
2 changed files with 54 additions and 103 deletions

View File

@@ -33,6 +33,26 @@ class _App:
def open_function(self): def open_function(self):
pass pass
class Task:
def __init__(self, name, rule, func):
self.name = name
self.rule = rule
self.func = func
@classmethod
def watch_video(cls, name, rule, device):
return cls(name, rule, lambda : device.swipe_up(150, 50))
@classmethod
def read_book(cls, name, rule, device):
return cls(name, rule, lambda : device.swipe_left(300, 100))
def start(self):
print("开始执行任务:{}".format(self.name))
_exec(self.rule, self.func, self.name)
print("任务{}执行完毕".format(self.name))
pass
def _exec(rule, func = [], log_prefix = "执行任务"): def _exec(rule, func = [], log_prefix = "执行任务"):
factor = getattr(rule, "factor") factor = getattr(rule, "factor")
if hasattr(rule, "num"): if hasattr(rule, "num"):
@@ -78,87 +98,35 @@ class ToutiaoLite(_App):
print("开宝箱...") print("开宝箱...")
print("开宝箱完成...") print("开宝箱完成...")
def watch_video(self, rule): # 看广告
print("开始看视频任务...") def func_ad(self):
_exec(rule, lambda : self.device.swipe_up(150, 50), "看视频中")
print("看视频任务完成")
def watch_ad(self, rule):
print("开始看广告任务...")
# 0.3
def func():
button = ui.Button(240, 450, 850, 920) button = ui.Button(240, 450, 850, 920)
point = button.get_point() point = button.get_point()
self.device.back() self.device.back()
self.device.back() # 防止自动进入直播界面 self.device.back() # 防止自动进入直播界面
self.device.click(point) self.device.click(point)
_exec(rule, lambda : func(), "看广告中")
print("看广告任务完成...")
class DouyinLite(_App): class DouyinLite(_App):
def __init__(self, device): def __init__(self, device):
_App.__init__(self, "抖音极速版") _App.__init__(self, "抖音极速版")
self.device = device self.device = device
def watch_video(self, rule):
print("开始看视频任务...")
_exec(rule, lambda : self.device.swipe_up(120, 80), "看视频中")
print("看视频任务完成")
class KuaishouLite(_App): class KuaishouLite(_App):
def __init__(self, device): def __init__(self, device):
_App.__init__(self, "快手极速版") _App.__init__(self, "快手极速版")
self.device = device self.device = device
def watch_video(self, rule):
print("开始看视频任务...")
_exec(rule, lambda : self.device.swipe_up(120, 80), "看视频中")
print("看视频任务完成")
def read_book(self, rule):
print("开始看书任务...")
_exec(rule, lambda : self.device.swipe_left(300, 100), "看书中")
print("看书任务完成")
class TomatoListen(_App): class TomatoListen(_App):
def __init__(self, device): def __init__(self, device):
_App.__init__(self, "番茄畅听") _App.__init__(self, "番茄畅听")
self.device = device self.device = device
def watch_video(self, rule):
print("开始看视频任务...")
_exec(rule, lambda : self.device.swipe_up(120, 80), "看视频中")
print("看视频任务完成")
def read_book(self, rule):
print("开始看书任务...")
_exec(rule, lambda : self.device.swipe_left(300, 100), "看书中")
print("看书任务完成")
class TomatoFiction(_App): class TomatoFiction(_App):
def __init__(self, device): def __init__(self, device):
_App.__init__(self, "番茄小说") _App.__init__(self, "番茄小说")
self.device = device self.device = device
def read_book(self, rule):
print("开始看书任务...")
_exec(rule, lambda : self.device.swipe_left(300, 100), "看书中")
print("看书任务完成")
def watch_video(self, rule):
print("开始看视频任务...")
_exec(rule, lambda : self.device.swipe_up(120, 80), "看视频中")
print("看视频任务完成")
class Alipay(_App): class Alipay(_App):
def __init__(self, device): def __init__(self, device):
_App.__init__(self, "支付宝") _App.__init__(self, "支付宝")
self.device = device self.device = device
def watch_video(self, rule):
print("开始看视频任务...")
_exec(rule, lambda : self.device.swipe_up(120, 80), "看视频中")
print("看视频任务完成")

View File

@@ -1,60 +1,32 @@
from devices import WikoHi70m from devices import WikoHi70m
import apps import apps
import random import random
import threading
def main(): def main():
# adb_cmd.mobile_unlock()
try: try:
# adb_cmd.mobile_unlock()
# 3URNU24803102309 192.168.8.138:5555 # 3URNU24803102309 192.168.8.138:5555
wikoHi70m = WikoHi70m('3URNU24803102309') wikoHi70m = WikoHi70m('3URNU24803102309')
func = [ # 任务
# 上滑视频 watchVideoTask = apps.Task.watch_video("刷视频", apps.Rule.count(500, 8, 2), wikoHi70m)
# lambda : device.swipe(240 + int(random.uniform(-10, 10)), 1200 + int(random.uniform(-10, 10)), 240 + int(random.uniform(-10, 10)), 1100 + int(random.uniform(-10, 10)), 50) bgWatchVideoTask = apps.Task("后台刷视频", apps.Rule.count(500, 8, 2),
# 左滑 lambda : wikoHi70m.swipe(240 + int(random.uniform(-10, 10)), 1200 + int(random.uniform(-10, 10)), 240 + int(random.uniform(-10, 10)), 1100 + int(random.uniform(-10, 10)), 50))
lambda : wikoHi70m.swipe(500 + int(random.uniform(-10, 10)), 1200 + int(random.uniform(-10, 10)), 240 + int(random.uniform(-10, 10)), 1200 + int(random.uniform(-10, 10)), 80), minWatchVideoTask = apps.Task("小窗刷视频", apps.Rule.count(500, 8, 2),
lambda : wikoHi70m.swipe(450 + int(random.uniform(-10, 10)), 400 + int(random.uniform(-10, 10)), 450 + int(random.uniform(-10, 10)), 200 + int(random.uniform(-10, 10)), 50))
readBookTask = apps.Task.read_book("看书", apps.Rule.count(1000, 5, 2), wikoHi70m)
minReadBookTask = apps.Task("小窗看书", apps.Rule.count(1000, 5, 2),
lambda : wikoHi70m.swipe(600 + int(random.uniform(-10, 10)), 400 + int(random.uniform(-10, 10)), 400 + int(random.uniform(-10, 10)), 400 + int(random.uniform(-10, 10)), 80))
bgReadBookTask = apps.Task("后台看书", apps.Rule.count(1000, 5, 2),
lambda : wikoHi70m.swipe(500 + int(random.uniform(-10, 10)), 1200 + int(random.uniform(-10, 10)), 240 + int(random.uniform(-10, 10)), 1200 + int(random.uniform(-10, 10)), 80))
# 左滑(小窗口)
lambda : wikoHi70m.swipe(600 + int(random.uniform(-10, 10)), 400 + int(random.uniform(-10, 10)), 400 + int(random.uniform(-10, 10)), 400 + int(random.uniform(-10, 10)), 80)
# 上滑浏览(小窗口)
# lambda : device.swipe(450 + int(random.uniform(-10, 10)), 400 + int(random.uniform(-10, 10)), 450 + int(random.uniform(-10, 10)), 200 + int(random.uniform(-10, 10)), 300)
# 上滑视频(小窗口)
# lambda : device.swipe(450 + int(random.uniform(-10, 10)), 400 + int(random.uniform(-10, 10)), 450 + int(random.uniform(-10, 10)), 200 + int(random.uniform(-10, 10)), 50)
]
apps._exec(apps.Rule.count(1000, 5, 2), func, "任务执行中")
# 快手 # 看视频+小窗看书,同步执行
# kuaishouLite = apps.KuaishouLite(wikoHi70m) sync_task(bgWatchVideoTask, minReadBookTask)
# kuaishouLite.watch_video(apps.Rule.count(500, 12, 2))
# kuaishouLite.read_book(apps.Rule.count(1000, 5, 2))
# 抖音
# douyinLite = apps.DouyinLite(wikoHi70m)
# 视频红包
# douyinLite.watch_video(apps.Rule.count(1000, 7, 2))
# 支付宝
# alipay = apps.Alipay(wikoHi70m)
# 视频红包
# alipay.watch_video(apps.Rule.count(1000, 7, 2))
# 头条
# toutiaoLite = apps.ToutiaoLite(wikoHi70m)
# toutiaoLite.watch_video(apps.Rule.count(500, 8, 2))
# toutiaoLite.watch_ad(apps.Rule.count(500, 45, 2))
# 番茄畅听
# tomatoListen = apps.TomatoListen(wikoHi70m)
# tomatoListen.read_book(apps.Rule.count(1000, 5, 2))
# tomatoListen.watch_video(apps.Rule.count(500, 12, 3))
# 番茄小说
# tomatoFiction = apps.TomatoFiction(wikoHi70m)
# tomatoFiction.read_book(apps.Rule.count(1000, 5, 2))
# tomatoFiction.watch_video(apps.Rule.count(500, 10, 2))
# 任务完成锁屏 # 任务完成锁屏
wikoHi70m.lock() wikoHi70m.lock()
@@ -62,6 +34,17 @@ def main():
except KeyboardInterrupt: except KeyboardInterrupt:
print("程序被用户中断") print("程序被用户中断")
# 多任务同步执行
def sync_task(task1, task2):
th1 = threading.Thread(target = lambda :task1.start())
th2 = threading.Thread(target = lambda :task2.start())
th1.start()
th2.start()
th1.join()
th2.join()
if __name__ == "__main__": if __name__ == "__main__":
main() main()