当前位置: 首页 > 网络游戏 >

Celery的基本使用-环球实时

时间:2023-06-29 22:20:00 来源:博客园


(相关资料图)

day11——celerycelery介绍架构和安装
# celery:分布式的异步任务框架,主要用来做:异步任务    延时任务    定时任务---》如果只想做定时任务,可以不使用celery,有别的选择# celery 框架,原理1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)2)celery服务为为其他项目服务提供异步解决任务需求的注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求人是一个独立运行的服务 | 医院也是一个独立运行的服务正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求   # celery架构消息中间件(broker):消息队列:可以使用redis,rabbitmq,咱们使用redis任务执行单元(worker):真正的执行 提交的任务任务执行结果存储(banckend):可以使用mysql,redis,咱们使用redis# 安装celery-pip install Celery    -释放出可执行文件:celery,由于 python解释器的script文件夹再环境变量,任意路径下执行celery都能找到   # celery不支持win,所以想再win上运行,需要额外安装eventletwindows系统需要eventlet支持:pip3 install eventletLinux与MacOS直接执行:3.x,4.x版本:celery worker -A demo -l info    5.x版本:     celery -A demo worker -l info -P eventlet
celery执行异步任务
# 基本使用1 在虚拟环境中装celery和eventlet    2 写个demo.py文件,实例化得到app对象,注册任务    from celery import Celery        import time        broker = "redis://127.0.0.1:6379/1"  # 消息中间件 redis        backend = "redis://127.0.0.1:6379/2"  # 结果存储 redis        app = Celery(__name__,broker=broker,backend=backend)        @app.task  # 变成celery的任务了        def add(a,b):            print("运算结果是",a+b)            time.sleep(1)            return a + b    3 启动worker(worker监听消息队列,等待别人提交任务,如果有则直接执行,没有则卡在这等待任务)    celery -A demo worker -l info -P eventlet          4 别人提交任务,提交完成会返回一个id号,后期使用id号查询,至于这个任务有没有被执行,取决于worker有没有启动,如果worker没有启动会存放在消息队列中等待    from demo import add        res = add.delay(2,2)    print(res)        5 提交任务的人,使用id号查看结果    from demo import app        # celery的包下        from celery.result import AsyncResult        id = "042a8fc1-6b0f-4ad6-bf72-edefa657a52f"        if __name__ == "__main__":            a = AsyncResult(id=id, app=app)            if a.successful():  # 正常执行完成                result = a.get()  # 任务返回的结果                print(result)            elif a.failed():                print("任务失败")            elif a.status == "PENDING":                print("任务等待中被执行")            elif a.status == "RETRY":                print("任务异常后正在重试")            elif a.status == "STARTED":                print("任务已经开始被执行")
包结构celery
# 使用步骤1 新建包:celery_task    2 再包下新建 celery.py 必须叫它,里面实例化得到app对象    from celery import Celery    broker = "redis://127.0.0.1:6379/1"  # 消息中间件 redis    backend = "redis://127.0.0.1:6379/2"  # 结果存储 redis    app = Celery(__name__, broker=broker, backend=backend, include=["celery_task.course_task","celery_task.home_task","celery_task.user_task"])        3 新建任务py文件:user_task.py   course_task.py  home_task.py    -以后跟谁相关的任务,就写在谁里面        from .celery import app        import time        @app.task        def send_sms(mobile, code):            time.sleep(2)            print("%s手机号,发送短信成功,验证码是:%s" % (mobile, code))            return True    4 启动worker,以包启动,来到包所在路径下    celery -A 包名 worker -l info -P eventlet        celery -A celery_task worker -l info -P eventlet           5 其他程序,导入任务,提交任务即可    from celery_task.user_task import send_sms        res = send_sms.delay(1999999333, 8888)        print(res)  # f33ba3c5-9b78-467a-94d6-17b9074e8533        6 其它程序,查询结果    from celery_task.celery import app    # celery的包下    from celery.result import AsyncResult    id = "51a669a3-c96c-4f8c-a5fc-e1b8e2189ed0"    if __name__ == "__main__":        a = AsyncResult(id=id, app=app)        if a.successful():  # 正常执行完成            result = a.get()  # 任务返回的结果            print(result)        elif a.failed():            print("任务失败")        elif a.status == "PENDING":            print("任务等待中被执行")        elif a.status == "RETRY":            print("任务异常后正在重试")        elif a.status == "STARTED":            print("任务已经开始被执行")
celery执行延迟任务和定时任务
# celery 可以做-异步任务    -延迟任务---》延迟多长时间干任务    -定时任务:每天12点钟,每隔几秒。。。   -如果只做定时任务,不需要使用celery这么重,apscheduler(自己去研究)        # 异步任务-导入异步任务的函数    -函数.delay(参数)   # 延迟任务-导入异步任务的函数    -函数.apply_async(kwargs={"mobile":"1896334234","code":8888},eta=时间对象)   # 定时任务:在app所在的文件下配置- 1 配置app.conf.beat_schedule = {        "send_sms": {            "task": "celery_task.user_task.send_sms",            "schedule": timedelta(seconds=5),            "args": ("1822344343", 8888),        },        "add_course": {            "task": "celery_task.course_task.add_course",            # "schedule": crontab(hour=8, day_of_week=1),  # 每周一早八点            "schedule": crontab(hour=11, minute=38),  # 每天11点35,执行            "args": (),        }    }    -2 启动beatcelery  -A celery_task beat -l info        启动worker    celery -A celery_task worker -l info -P eventlet    -3 到了时间,beat进程负责提交任务到消息队列---》worker执行
django中使用celery
# 使用步骤1 把之前写好的包,copy到项目根路径下    2 在xx_task.py 中写任务    from .celery import app        @app.task        def add_banner():            from home.models import Banner            Banner.objects.create(title="测试", image="/1.png", link="/banner", info="xxx",orders=99)            return "banner增加成功"    3 在celery.py 中加载django配置    import os    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_api.settings.dev")        4 视图函数中,导入任务,提交即可    class CeleryView(APIView):            def get(self, request):                res = add_banner.delay()                return APIResponse(msg="新增banner的任务已经提交了")    5 启动worker,等待运行即可    celery -A celery_task worker -l info -P eventlet
接口缓存
# 所有接口都可以改造,尤其是查询所有的这种接口,如果加入缓存,会极大的提高查询速度# 首页轮播图接口:获取轮播图数据,加缓存---》咱们只是以它为例class BannerView(GenericViewSet, ListModelMixin):    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by("orders")[:settings.BANNER_COUNT]    serializer_class = BannerSerializer    def list(self, request, *args, **kwargs):        """          1 先去缓存中查一下有没有数据          2 如果有,直接返回,不走父类的list了(list在走数据库)          3 如果没有,走父类list,查询数据库          4 把返回的数据,放到缓存中        """        data = cache.get("home_banner_list")        if not data:  # 缓存中没有            print("走了数据库")            res = super().list(request, *args, **kwargs)  # 查询数据库            # 返回的数据,放到缓存中            data = res.data.get("data")  # {code:100,msg:成功,data:[{},{}]}            cache.set("home_banner_list", data)        return APIResponse(data=data)    # 公司里可能会这么写-写一个查询所有带缓存的基类    -写个装饰器,只要一配置,就自动带缓存        # 双写一致性问题:缓存数据和数据库数据不一致了-写入数据库,删除缓存    -写入数据库,更新缓存    -定时更新缓存
关键词:
x 广告
x 广告

Copyright ©  2015-2022 西南游戏网版权所有  备案号:皖ICP备2022009963号-8   联系邮箱:39 60 29 14 2@qq.com