简介: 众所周知,游戏行业在当今的互联网行业中算是一棵常青树。在疫情之前的 2019 年,中国游戏市场营收规模约 2884.8 亿元,同比增长 17.1%。2020 年因为疫情,游戏行业更是突飞猛进。玩游戏本就是中国网民最普遍的娱乐方式之一,疫情期间更甚。据不完全统计,截至 2019 年,中国移动游戏用户规模约 6.6 亿人,占中国总网民规模 8.47 亿的 77.92%,可见游戏作为一种低门槛、低成本的娱乐手段,已成为大部分人生活中习以为常的一部分。
众所周知,游戏行业在当今的互联网行业中算是一棵常青树。在疫情之前的 2019 年,中国游戏市场营收规模约 2884.8 亿元,同比增长 17.1%。2020 年因为疫情,游戏行业更是突飞猛进。玩游戏本就是中国网民最普遍的娱乐方式之一,疫情期间更甚。据不完全统计,截至 2019 年,中国移动游戏用户规模约 6.6 亿人,占中国总网民规模 8.47 亿的 77.92%,可见游戏作为一种低门槛、低成本的娱乐手段,已成为大部分人生活中习以为常的一部分。
对于玩家而言,市面上的游戏数量多如牛毛,那么玩家如何能发现和认知到一款游戏,并且持续的玩下去恐怕是所有游戏厂商需要思考的问题。加之 2018 年游戏版号停发事件,游戏厂商更加珍惜每一个已获得版号的游戏产品,所以这也使得“深度打磨产品质量”和“提高运营精细程度”这两个游戏产业发展方向成为广大游戏厂商的发展思路,无论是新游戏还是老游戏都在努力落实这两点:
这里我们重点来看新游戏。一家游戏企业辛辛苦苦研发三年,等着新游戏发售时一飞冲天。那么问题来了,新游戏如何被广大玩家看到?
首先来看看游戏行业公司的分类:
这三种类型的业务,有专注于其中某一领域的独立公司,也有能承接全部业务的公司,但无论那一种,这三者之间的关系是不会变的:
所以不难理解,想让更多的玩家看到你的游戏,游戏发行和运营是关键。通俗来讲,如果你的游戏出现在目前所有大家熟知的平台广告中,那么最起码游戏的新用户注册数量是很可观的。因此这就引入了一个关键词:买量。
根据数据显示,2019 年月均买量手游数达 6000+ 款,而 2018 年仅为 4200 款。另一方面,随着抖音、微博等超级 APP 在游戏买量市场的资源倾斜,也助推手游买量的效果和效率有所提升,游戏厂商更愿意使用买量的方式来吸引用户。
但需要注意的是,在游戏买量的精准化程度不断提高的同时,买量的成本也在节节攀升,唯有合理配置买量、渠道与整合营销之间的关系,才能将宣发资源发挥到最大的效果。
通俗来讲,买量其实就是在各大主流平台投放广告,广大用户看到游戏广告后,有可能会点击广告,然后进入游戏厂商的宣传页面,同时会采集用户的一些信息,然后游戏厂商对采集到的用户信息进行大数据分析,进行进一步的定向推广。
游戏厂商花钱买量,换来的用户信息以及新用户注册信息是为持续的游戏运营服务的,那么这个场景的核心诉求就是采集用户信息的完整性。
比如说,某游戏厂商一天花 5000w 投放广告,在某平台某时段产生了每秒 1w 次的广告点击率,那么在这个时段内每一个点击广告的用户信息要完整的被采集到,然后入库进行后续分析。这就对数据采集系统提出了很高的要求。
这其中,最核心的一点就是系统暴露接口的环节要能够平稳承载买量期间不定时的流量脉冲。在买量期间,游戏厂商通常会在多个平台投放广告,每个平台投放广告的时间是不一样的,所以就会出现全天不定时的流量脉冲现象。如果这个环节出现问题,那么相当于买量的钱就打水漂了。
上图是一个相对传统的数据采集系统架构,最关键的就是暴露 HTTP 接口回传数据这部分,这部分如果出问题,那么采集数据的链路就断了。但这部分往往会面临两个挑战:
通常情况下,在游戏有运营活动之前,会提前通知运维同学,对这个环节的服务增加节点,但要增加多少其实是无法预估的,只能大概拍一个数字。这是在传统架构下经常会出现的场景,这就会导致两个问题:
我们可以通过函数计算 FC 来取代传统架构中暴露 HTTP 回传数据这部分,从而完美解决传统架构中存在问题,参考文章:《资源成本双优化!看 Serverless 颠覆编程教育的创新实践》。
先来看架构图:
传统架构中的两个问题均可以通过函数计算百毫秒弹性的特性来解决。我们并不需要去估算营销活动会带来多大的流量,也不需要去担心和考虑对数据采集系统的性能,运维同学更不需要提前预备 ECS。
因为函数计算的极致弹性特性,当没有买量、没有营销活动的时候,函数计算的运行实例是零。有买量活动时,在流量脉冲的情况下,函数计算会快速拉起实例来承载流量压力;当流量减少时,函数计算会及时释放没有请求的实例进行缩容。所以 Serverless 架构带来的优势有以下三点:
从上面的架构图可以看到,整个采集数据阶段,分了两个函数来实现,第一个函数的作用是单纯的暴露 HTTP 接口接收数据,第二个函数用于处理数据,然后将数据发送至消息队列 Kafka 和数据库 RDS。
==========
我们打开函数计算控制台,创建一个函数:
创建好函数之后,我们通过在线编辑器编写代码:
# -*- coding: utf-8 -*- import logging import json import urllib.parse HELLO_WORLD = b'Hello world!n' def handler(environ, start_response): logger = logging.getLogger() context = environ['fc.context'] request_uri = environ['fc.request_uri'] for k, v in environ.items(): if k.startswith('HTTP_'): # process custom request headers pass try: request_body_size = int(environ.get('CONTENT_LENGTH', 0)) except (ValueError): request_body_size = 0 # 接收回传的数据 request_body = environ['wsgi.input'].read(request_body_size) request_body_str = urllib.parse.unquote(request_body.decode("GBK")) request_body_obj = json.loads(request_body_str) logger.info(request_body_obj["action"]) logger.info(request_body_obj["articleAuthorId"]) status = '200 OK' response_headers = [('Content-type', 'text/plain')] start_response(status, response_headers) return [HELLO_WORLD]
此时的代码非常简单,就是接收用户传来的参数,我们可以调用接口进行验证:
可以在函数的日志查询中看到此次调用的日志:
同时,我们也可以查看函数的链路追踪来分析每一个步骤的调用耗时,比如函数接到请求→冷启动(无活跃实例时)→准备代码→执行初始化方法→执行入口函数逻辑这个过程:
从调用链路图中可以看到,刚才的那次请求包含了冷启动的时间,因为当时没有活跃实例,整个过程耗时 418 毫秒,真正执行入口函数代码的时间为 8 毫秒。
当再次调用接口时,可以看到就直接执行了入口函数的逻辑,因为此时已经有实例在运行,整个耗时只有 2.3 毫秒:
===========
第一个函数是通过在函数计算控制台在界面上创建的,选择了运行环境是 Python3,我们可以在官方文档中查看预置的 Python3 运行环境内置了哪些模块,因为第二个函数要操作 Kafka 和 RDS,所以需要我们确认对应的模块。
从文档中可以看到,内置的模块中包含 RDS 的 SDK 模块,但是没有 Kafka 的 SDK 模块,此时就需要我们手动安装 Kafka SDK 模块,并且创建函数也会使用另一种方式。
Funcraft 是一个用于支持 Serverless 应用部署的命令行工具,能帮助我们便捷地管理函数计算、API 网关、日志服务等资源。它通过一个资源配置文件(template.yml),协助我们进行开发、构建、部署操作。
所以第二个函数我们需要使用 Fun 来进行操作,整个操作分为四个步骤:
Fun 提供了三种安装方式:
文本示例环境为 Mac,所以使用 npm 方式安装,非常的简单,一行命令搞定:
sudo npm install @alicloud/fun -g
安装完成之后。在控制终端输入 fun 命令可以查看版本信息:
$ fun --version 3.6.20
在第一次使用 fun 之前需要先执行 fun config 命令进行配置,按照提示,依次配置 Account ID、Access Key Id、Secret Access Key、 Default Region Name 即可。其中 Account ID、Access Key Id 你可以从函数计算控制台首页的右上方获得:
fun config
? Aliyun Account ID _*_01
? Aliyun Access Key ID _*_qef6j
? Aliyun Access Key Secret _*_UFJG
? Default region name cn-hangzhou
? The timeout in seconds for each SDK client invoking 60
? The maximum number of retries for each SDK client 3
新建一个目录,在该目录下创建一个名为 template.yml 的 YAML 文件,该文件主要描述要创建的函数的各项配置,说白了就是将函数计算控制台上配置的那些配置信息以 YAML 格式写在文件里:
ROSTemplateFormatVersion: '2015-09-01' Transform: 'Aliyun::Serverless-2018-04-03' Resources: FCBigDataDemo: Type: 'Aliyun::Serverless::Service' Properties: Description: 'local invoke demo' VpcConfig: VpcId: 'vpc-xxxxxxxxxxx' VSwitchIds: [ 'vsw-xxxxxxxxxx' ] SecurityGroupId: 'sg-xxxxxxxxx' LogConfig: Project: fcdemo Logstore: fc_demo_store dataToKafka: Type: 'Aliyun::Serverless::Function' Properties: Initializer: index.my_initializer Handler: index.handler CodeUri: './' Description: '' Runtime: python3
我们来解析以上文件的核心内容:
目录结构为:
服务和函数的模板创建好之后,我们来安装需要使用的第三方依赖。在这个示例的场景中,第二个函数需要使用 Kafka SDK,所以可以通过 fun 工具结合 Python 包管理工具 pip 进行安装:
fun install --runtime python3 --package-type pip kafka-python
执行命令后有如下提示信息:
此时我们会发现在目录下会生成一个.fun文件夹 ,我们安装的依赖包就在该目录下:
现在编写好了模板文件以及安装好了我们需要的 Kafka SDK 后,还需要添加我们的代码文件 index.py,代码内容如下:
# -*- coding: utf-8 -*- import logging import json import urllib.parse from kafka import KafkaProducer producer = None def my_initializer(context): logger = logging.getLogger() logger.info("init kafka producer") global producer producer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092') def handler(event, context): logger = logging.getLogger() # 接收回传的数据 event_str = json.loads(event) event_obj = json.loads(event_str) logger.info(event_obj["action"]) logger.info(event_obj["articleAuthorId"]) # 向Kafka发送消息 global producer producer.send('ikf-demo', json.dumps(event_str).encode('utf-8')) producer.close() return 'hello world'
代码很简单,这里做以简单的解析:
登录函数计算控制台,可以看到通过 fun 命令部署的服务和函数:
进入函数,也可以清晰的看到第三方依赖包的目录结构:
==========
目前两个函数都创建好了,下面的工作就是由第一个函数接收到数据后拉起第二个函数发送消息给 Kafka。我们只需要对第一个函数做些许改动即可:
# -*- coding: utf-8 -*- import logging import json import urllib.parse import fc2 HELLO_WORLD = b'Hello world!n' client = None def my_initializer(context): logger = logging.getLogger() logger.info("init fc client") global client client = fc2.Client( endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com", accessKeyID="your_ak", accessKeySecret="your_sk" ) def handler(environ, start_response): logger = logging.getLogger() context = environ['fc.context'] request_uri = environ['fc.request_uri'] for k, v in environ.items(): if k.startswith('HTTP_'): # process custom request headers pass try: request_body_size = int(environ.get('CONTENT_LENGTH', 0)) except (ValueError): request_body_size = 0 # 接收回传的数据 request_body = environ['wsgi.input'].read(request_body_size) request_body_str = urllib.parse.unquote(request_body.decode("GBK")) request_body_obj = json.loads(request_body_str) logger.info(request_body_obj["action"]) logger.info(request_body_obj["articleAuthorId"]) global client client.invoke_function( 'FCBigDataDemo', 'dataToKafka', payload=json.dumps(request_body_str), headers = {'x-fc-invocation-type': 'Async'} ) status = '200 OK' response_headers = [('Content-type', 'text/plain')] start_response(status, response_headers) return [HELLO_WORLD]
如上面代码所示,对第一个函数的代码做了三个地方的改动:
def my_initializer(context): logger = logging.getLogger() logger.info("init fc client") global client client = fc2.Client( endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com", accessKeyID="your_ak", accessKeySecret="your_sk" )
这里需要注意的时,当我们在代码里增加了初始化方法后,需要在函数配置中指定初始化方法的入口:
global client client.invoke_function( 'FCBigDataDemo', 'dataToKafka', payload=json.dumps(request_body_str), headers = {'x-fc-invocation-type': 'Async'} )
invoke_function 函数有四个参数:
如此设置,我们便可以验证通过第一个函数提供的 HTTP 接口发起请求→采集数据→调用第二个函数→将数据作为消息传给 Kafka 这个流程了。
到这里有些同学可能会有疑问,为什么需要两个函数,而不在第一个函数里直接向 Kafka 发送数据呢?我们先来看这张图:
当我们使用异步调用函数时,在函数内部会默认先将请求的数据放入消息队列进行第一道削峰填谷,然后每一个队列在对应函数实例,通过函数实例的弹性拉起多个实例进行第二道削峰填谷。所以这也就是为什么这个架构能稳定承载大并发请求的核心原因之一。
============
在游戏运营这个场景中,数据量是比较大的,所以对 Kafka 的性能要求也是比较高的,相比开源自建,使用云上的 Kafka 省去很多的运维操作,比如:
总的来说,就是一切 SLA 都有云上兜底,我们只需要关注在消息发送和消息消费即可。
所以我们可以打开 Kafka 开通界面,根据实际场景的需求一键开通 Kafka 实例,开通 Kafka 后登录控制台,在基本信息中可以看到 Kafka 的接入点:
将默认接入点配置到函数计算的第二个函数中即可。
.... producer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092') ....
然后点击左侧控制台 Topic 管理,创建 Topic:
将创建好的 Topic 配置到函数计算的第二个函数中即可。
... # 第一个参数为Topic名称 producer.send('ikf-demo', json.dumps(event_str).encode('utf-8')) ...
上文已经列举过云上 Kafka 的优势,比如动态增加 Topic 的分区数,我们可以在 Topic 列表中,对 Topic 的分区数进行动态调整:
单 Topic 最大支持到 360 个分区,这是开源自建无法做到的。
接下来点击控制台左侧 Consumer Group 管理,创建 Consumer Group:
至此,云上的 Kafka 就算配置完毕了,即 Producer 可以往刚刚创建的 Topic 中发消息了,Consumer 可以设置刚刚创建的 GID 以及订阅 Topic 进行消息接受和消费。
在这个场景中,Kafka 后面往往会跟着 Flink,所以这里简要给大家介绍一下在 Flink 中如何创建 Kafka Consumer 并消费数据。代码片段如下:
final ParameterTool parameterTool = ParameterTool.fromArgs(args); String kafkaTopic = parameterTool.get("kafka-topic","ikf-demo"); String brokers = parameterTool.get("brokers", "XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092"); Properties kafkaProps = new Properties(); kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "ikf-demo"); FlinkKafkaConsumer<UserBehaviorEvent> kafka = new FlinkKafkaConsumer<>(kafkaTopic, new UserBehaviorEventSchema(), kafkaProps); kafka.setStartFromLatest(); kafka.setCommitOffsetsOnCheckpoints(false); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<UserBehaviorEvent> dataStreamByEventTime = env.addSource(kafka);
以上就是构建 Flink Kafka Consumer 和添加 Kafka Source 的代码片段,还是非常简单的。
至此,整个数据采集的架构就搭建完毕了,下面我们通过压测来检验一下整个架构的性能。这里使用阿里云 PTS 来进行压测。
打开 PTS 控制台,点击左侧菜单创建压测/创建 PTS 场景:
在场景配置中,将第一个函数计算函数暴露的 HTTP 接口作为串联链路,配置如下图所示:
接口配置完后,我们来配置施压:
这里因为资源成本原因,并发用户数设置为 2500 来进行验证。
从上图压测中的情况来看,TPS 达到了 2w 的封顶,549w+ 的请求,99.99% 的请求是成功的,那 369 个异常也可以点击查看,都是压测工具请求超时导致的。
至此,整个基于 Serverless 搭建的大数据采集传输的架构就搭建好了,并且进行了压测验证,整体的性能也是不错的,并且整个架构搭建起来也非常简单和容易理解。这个架构不光适用于游戏运营行业,其实任何大数据采集传输的场景都是适用的,目前也已经有很多客户正在基于 Serverless 的架构跑在生产环境,或者正走在改造 Serverless 架构的路上。
作者:计缘
原文链接
本文为阿里云原创内容,未经允许不得转载