免费的异步事件处理方案

背景

在我们日常开发的过程中,消息队列是一个非常常见的组件。例如执行一些批量任务,我们可以拆分任务后,通过消息队列来进行执行,起到削峰的作用。

我也有个场景:做邮件发送。邮件发送本身是一个比较简单的功能,但是邮件发送本身是一个比较耗时的操作,如果有多封邮件需要发送,不可避免地会遇到执行时间较长的情况。在一些云服务器上,通常会有系统级的超时设置。比如说百度云bch,php执行的超时时间估算在60秒以下,一次请求如果超过这么长时间没有执行结束并返回结果,执行就可能失败了。如果一个脚本中,连续发送几封邮件,就有可能触发到这个60秒的超时配置,导致执行失败。


可行的做法

因此,我先做了个操作,把邮件发送功能单独拎出来作为一个服务提供出来,暂时先通过控制每次调用发送量的方式,避免触发到超时。


接下来,就要考虑,如何通过消息队列来进行异步触发执行。


我的做法是在一台vps上,搭建了RabbitMQ,并基于Workerman运行了一个服务进行队列消息的订阅、消费。这样,原先接口之间的同步调用,就可以通过向RabbitMQ中写入消息的方式,变成异步调用。而且,也可以根据需要设置回调,在QueueService调用接口Interface2之后,再通知到Interface1所对应的回调接口。

image.png

方案本身没有太大的问题,但是我为了降低vps的续费成本,隔几年会换用其他有新帐户优惠的机器,RabbitMQ也要重新部署。我就考虑,是否有长期免费(或价格极低)的队列服务,可以满足我的需要呢?


如果想要实现异步事件的处理,不仅仅是一个消息队列组件的事儿,还需要有一个“消费者”(类似上图的QueueService)。消息队列,各个云厂商基本都会有,无非价格多少,或者有多少免费额度的事儿;可是消费者,总不能又是一套VPS吧?


其实反过来看一下这个消费者,其实功能本身很简单,无非就是根据订阅、接收到的消息,去发起一个http的请求,基本没有多余的逻辑。消费者本身不需要存储数据,也没有上下文的关联,每次请求都是新的,每次请求基本也是很快可以执行完成(几秒,或者最多十几秒),这其实是很典型的“函数计算”场景。

各个云厂商的能力

国内、国外各个云厂商,除了基础的云服务器之外,也都会提供各种名称迥异的云服务供开发者使用。比如百度BCH,就相当于是虚拟主机,阿里云RDS,就是各类数据库服务。


消息队列,不同厂商也基本都会提供,有的基于RabbitMQ,有的基于RocketMQ,也有基于Kafka的。函数计算,不少厂商也有提供,比如腾讯云、百度云等,基本都有一些免费额度。


但是,国内的云服务厂商,提供的免费额度基本都是有有效期的,比如3个月内有效。而消息队列的使用,基本一个月都是需要几百块钱的投入。这个钱吧,也不能说贵,但对我这样,一个月可能最多就几千次、上万次调用的场景,确实不太值。

CloudFlare

经过我的精打细算,先盯上的是强大又慷慨的CloudFlare。


CF的Workers功能,基本就等同于函数计算。它可以针对一次请求进行各种处理,通过js实现一些相对复杂的逻辑。比如说我之前基于CF Workers搭建ChatGPT API的反向代理,这让我的调用方便了很多。关键是,免费额度,足以支撑我日常的使用。


既然CF Workers可以通过js实现各种复杂逻辑,那么订阅消息,去实现一个http接口的访问自然是易如反掌。那么关键就看CF是否还有对应的消息队列功能了。CF确实提供了Queues这个功能,而且号称是免费的。可是!最关键的,可是!Queues需要配合Workers的Paid Plan使用,也就是说,必须在Workers这项能力上花钱付费,才能使用免费的Queue消息队列功能,最便宜的是一个月5刀。


贵么?不算贵。基本上,一年的花费,和国内云厂商一个月的花费比较接近。不过,既然还是要花钱,那这还不能成为我的第一选择。

Oracle

Oracle提供了“Always Free”计划,也就是说,注册成功的用户,除了赠送限期的美元额度之外,还可以永久免费地使用一些资源。最最值得的是可以有2台永久免费的vps!还有一些数据库、网络、日志等免费资源可以用。


稍微研究下,会发现Oracle的队列、函数计算功能,没有包含在Always Free计划中,所以需要升级到付费计划(不一定需要真付钱,根据实际使用情况付费)。升级之后,消息队列、函数计算,也分别有每个月上百万次调用的免费额度。这个免费额度,实际也绝对够我使用了。


其实,我是真想用Oracle的。可惜,最最最大的问题是,Oracle VPS帐户注册,是一个非常玄学的问题。注册时,需要绑定信用卡,但是这个卡是否会通过验证,就很难讲。信用卡支持Visa、Master、美运,我分别试过,都失败过。但根据网上的说法,即使失败也没事,过段时间重新注册,也还是有机会通过。网上普遍的情况是注册十几次可能都不一定通过一次,所以说是玄学问题。


这就导致我一直无法在Oracle上试用起来。


很久之后,我的一张美运卡,神奇地通过验证了。可惜这个时候,我已经在其他的平台完成了队列和函数计算的搭建,就没再在Oracle上做这个尝试了。


AWS

亚马逊的AWS也是个非常强大、慷慨的云服务厂商,只可惜,很多功能真的挺难用。。。AWS提供了1年的免费期,注册帐户后,所有的服务在1年内都可以免费使用,这对比国内动不动30天的免费,最多3个月的部分服务免费来说,真是良心极了。


而且,即使超出了1年的使用期限,还是有很多服务,依然有免费的额度供使用。比如我用到的消息队列、函数计算等等。而且AWS是一个真正的云服务厂商,相比CloudFlare来说,提供的服务更加全面。


唯一的问题是,AWS的使用真的很复杂。各类配置、名词让人头疼。尤其是权限配置,是相当复杂,我也是摸索了好一阵子才大概知道要怎么配出可以跨应用使用的权限来。


当然,在“免费”的光环加持下,我依然有了足够的动力去克服了各种困难。


实现方案的探索

函数计算

先看函数计算。AWS的函数计算被称作“Lambda”,这个名字也很有意思。如果对编程有较深的了解,可能会知道“Lambda表达式”这么个概念。


AWS的Lambda,每个月有上百万的免费额度,也是绝对够用了。同时,它支持Java、Python、NodeJS等多种语言来进行实现。对我来说,需要的功能抽象一下,只需要完成消息体(例如Json)的解析、根据消息的参数进行一些判断、发起一个或多个http请求(GET或POST)这样一些简单的步骤,所以实际上,哪一种语言都ok。犹豫了一下,结合自己的熟悉程度,以及性能方面的综合考虑,选定了Python。


Python,AWS还提供了不同版本的支持,我选择了Python3.10这样一个比较新的版本,结果这个版本不少实现方式和我熟悉的版本不太一致,外加AWS做了一些定制,日志打印也有一些问题,让我用得有点儿磕磕绊绊。


代码我就不放了,不到100行的实现。核心的代码是基于urllib3,根据接收到消息内容中的http地址、方法、参数等,进行调用。


消息队列

接下来,我们就需要配置通过消息队列来进行函数执行的触发。AWS上去看各类消息队列,先会注意到Amazon MQ这个组件。但是研究了下,发现它其实只是12个月内免费使用。如果超出时间,需要按照使用量付费,以RabbitMQ为例,最小化的资源使用,基本上一个月需要25美元,这个价格就有点儿贵了。

image.png


除了MQ之外,其实还有一个叫做Simple Queue Service(简称SQS)的组件,同样提供了较为简单的消息队列功能。而这个SQS,也可以通过界面配置,和Lambda进行搭配,实现消息驱动函数执行的功能。重要的是,SQS每个月有百万级别的免费额度。

image.png


消息队列和Lambda的对接

界面上操作触发对接比较容易,在Lambda配置界面,通过“添加触发器”的按钮,就可以根据需要对接各种消息来源。

image.png


麻烦的是需要理解Lambda和SQS组件中各种配置项的含义,以及配置权限以满足Lambda和SQS两者的调用。


那么,SQS中如何塞入消息?这个其实也不难,文档中提供了一些调用的样例,我们可以通过各种语言的SDK,轻松实现对SQS的调用。


我自定义了一个消息体格式,主要是通过json封装http请求的描述,比如url、method、params、callback等。结果,这里又踩了一个比较大的坑。


我原先以为,我在SQS中塞入什么样格式的消息,在Lambda的函数中,就可以直接通过参数event来获取。结果,一直没有按我预期的那样触发http请求。(我传入的消息,是让函数去调用我的一个日志服务接口,这样,我就通过阿里云日志服务知道这个请求正确触发执行了。这部分的内容,可以参见我之前的文章:《统一日志服务器的设计实现》


我拿着这个消息,去Lambda中直接尝试调试,自然没问题。那么,问题出在哪里,或者如何去解决?我试着在Lambda中加入一些日志打印。这里又遇到了两个坑,一个是打印的日志,在AWS的CloudWatch Logs中会有延时,所以执行之后有可能好一阵子我都看不到日志;另外,我默认采用的是logging.info去写日志,这个日志级别的信息,在CloudWatch中也会不展示,最后我调整成了logging.warn去打印日志,才看到日志记录。


经过日志查询+文档检索,终于发现,消息进入到SQS,再传给Lambda的时候,会做封装,变成类似下面的消息结构,即“body”字段中的字符串才是我的消息体:

{
    "Records": [
        {
            "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
            "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
            "body": "test",
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1545082649183",
                "SenderId": "AIDAIENQZJOLO23YVJ4VO",
                "ApproximateFirstReceiveTimestamp": "1545082649185"
            },
            "messageAttributes": {},
            "md5OfBody": "098f6bcd4621d373cade4e832627b4f6",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-1:111122223333:my-queue",
            "awsRegion": "us-east-1"
        }
    ]
}

我非常无语地去按照这样的格式,重新调整了我Lambda函数中的解析,发现,果然OK了。


换一个实现方式

调试完成后,发现了又一个大坑,那就是延时。


SQS本身支持“延时”的配置,即指定0秒-15分钟内的一个时间值,可以让消息延时对应时间后再被处理。但实际上,SQS自身也存在延时性(可能是处于性能考虑),即使我配置延时值为0秒,下游Lambda也可能要等上十几秒,才能够获得到这个消息。


这样的延时,也不是说完全不可接受,但总觉得不是特别完美。于是,我又继续翻看Lambda的文档,看看还可以通过什么方式来对它进行触发。


image.png

AWS针对Lambda的文档中,列出了一些消息订阅来源,但这些都不适配我需要的场景。我试着在Lambda的函数配置中增加触发器,终于发现了一个看起来不错的组件:Simple Notification Service,简称SNS。看了下文档,这不就是支持主题模式的消息队列嘛!


有了SQS的经验之后,在SNS上的配置也比较顺利。需要注意的无非两点,一个是代码中朝SNS发布消息的代码需要做一些修改;另外就是Lambda中解析消息的时候,格式上稍有变化。


经过试验,发现SNS的消息延迟很低,完全可以达到秒级内的调用。这下子,整个方案就串联起来了:

image.png

和原先的架构相比,其实变化不大,无非是消息队列、监听服务,分别被AWS的SNS、Lambda所替代,而且也不再需要Workerman的执行环境。关键是这一整套的组合,不再需要单独进行运维性质的维护,借助AWS提供的免费额度,大大降低了人力投入、金钱投入的成本。


目前,这套方案暂时先在我系统中的部分功能上开通了灰度使用,效果很不错!

本文链接:https://www.poisonbian.com/post/5052.html 转载需授权!

分享到:
原文链接:,转发请注明来源!
「免费的异步事件处理方案」评论列表

发表评论