1.本发明涉及大数据处理技术领域,尤其是一种数据全链路追踪分析方法、装置、电子设备及存储介质。
背景技术:
2.相关技术中,大数据平台的实时数据的采集计算通常会经历多个环节,任何一个环节出现异常,都会导致任务失败。在分析过程中,如果只分析其中一个环节的状态,则难以发现运行时的问题,从而可能导致看到的运行状态与实际的运行情况不一致。
技术实现要素:
3.本发明旨在至少解决现有技术中存在的技术问题之一。为此,本发明提出一种数据全链路追踪分析方法、装置、电子设备及存储介质,能够使得降低发生看到的运行状态与实际运行状态不一致的概率。
4.一方面,本发明实施例提供了一种数据全链路追踪分析方法,包括以下步骤:
5.记录多源采集数据的第一轨迹,根据所述第一轨迹生成图数据库的第一节点数据;
6.记录实时计算数据的第二轨迹;
7.记录入库结果的第三轨迹,根据所述第三轨迹生成所述图数据库的第二节点数据后,并计算所述第一节点数据与所述第二节点数据的第一关联关系;
8.根据所述第一节点数据、所述第二节点数据、所述第二轨迹和所述第一关联关系生成全链路图;
9.根据所述全链路图对数据全链路进行分析并展示。
10.在一些实施例中,所述记录多源采集数据的第一轨迹,根据所述第一轨迹生成图数据库的第一节点数据,包括:
11.将所述多源采集数据写入到消息中间件,所述消息中间件用于存储所述多源采集数据以及所述多源采集数据的第一轨迹;
12.通过异步多线程方式从所述消息中间件中读取所述第一轨迹上的多源采集数据,并将读取到的所述多源采集数据作为图数据库的第一节点数据。
13.在一些实施例中,所述通过异步多线程方式从所述消息中间件中读取所述第一轨迹上的多源采集数据,包括:
14.实时监听所述消息中间件的消息;
15.当监听到所述消息中间件中存在满足第一预设要求的消息进入时,获取满足所述第一预设要求的消息及对应的位移点;其中,多个所述位移点组成所述第一轨迹;
16.根据所述位移点,通过异步多线程方式读取满足所述第一预设要求的消息。
17.在一些实施例中,所述记录实时计算数据的第二轨迹,包括:
18.获取所述实时计算数据中的源数据;
19.对所述源数据作聚合计算,得到聚合数据;
20.计算源数据和所述聚合数据的第二关联关系;
21.根据所述第二关联关系生成所述实时计算数据的第二轨迹。
22.在一些实施例中,在所述根据所述第一节点数据、所述第二节点数据、所述第二轨迹和所述第一关联关系生成全链路图这一步骤后,所述方法还包括:
23.生成所述全链路图中每个数据的全局唯一识别码;
24.根据所述全局唯一识别码对所述全链路图进行分片存储,并根据所述全链路图中的数据类型设置索引方式。
25.在一些实施例中,在所述根据所述全链路图对数据全链路进行分析并展示这一步骤之前,所述方法还包括:
26.根据时间口径对每个采集的数据设置采集时间标签;
27.根据采集到的数据对应的处理序号生成计算过程图;
28.根据所述采集时间标签和所述计算过程图确定异常数据集。
29.在一些实施例中,所述根据所述全链路图对数据全链路进行分析并展示,包括:
30.根据查询数据在所述全链路图查询得到目标数据;
31.根据所述目标数据在所述全链路图中获取上下游数据后,获取所述上下游数据与所述目标数据的第三关联关系;
32.根据所述上下游数据、所述第三关联关系和所述异常数据集生成数据追踪分析图;
33.对所述数据追踪分析图进行展示。
34.另一方面,本发明实施例提供了一种数据全链路追踪分析装置,包括:
35.第一模块,用于记录多源采集数据的第一轨迹,根据所述第一轨迹生成图数据库的第一节点数据;
36.第二模块,用于记录实时计算数据的第二轨迹;
37.第三模块,用于记录入库结果的第三轨迹,根据所述第三轨迹生成所述图数据库的第二节点数据后,并计算所述第一节点数据与所述第二节点数据的第一关联关系;
38.第四模块,用于根据所述第一节点数据、所述第二节点数据、所述第二轨迹和所述第一关联关系生成全链路图;
39.第五模块,用于根据所述全链路图对数据全链路进行分析并展示。
40.另一方面,本发明实施例提供了一种电子设备,包括:
41.至少一个存储器,用于存储程序;
42.至少一个处理器,用于加载所述程序以执行所述的数据全链路追踪分析方法。
43.另一方面,本发明实施例提供了一种计算机存储介质,其中存储有计算机可执行的程序,所述计算机可执行的程序被处理器执行时用于实现所述的数据全链路追踪分析方法。
44.本发明实施例提供的一种数据全链路追踪分析方法,具有如下有益效果:
45.本实施例通过分别记录多源采集数据的第一轨迹、实时计算数据的第二轨迹以及入库结果的第三轨迹后,根据第一轨迹生成图数据库的第一节点数据,根据第三轨迹生成图数据库的第二节点数据后,计算第一节点数据与第二节点数据的第一关联关系,从而可
以通过该关联关系得到各个节点数据之间的关联程度,然后根据第一节点数据、第二节点数据、第二轨迹和第一关联关系生成全链路图后,根据全链路图对数据全链路进行分析并展示,从而可以使得展示看到的运行状态更符合实际运行状态,进而可以及时发现网络运行问题。
46.本发明的附加方面和优点将在下面的描述中部分给出,部分将从下面的描述中变得明显,或通过本发明的实践了解到。
附图说明
47.下面结合附图和实施例对本发明做进一步的说明,其中:
48.图1为本发明实施例一种数据全链路追踪分析方法的流程图;
49.图2为本发明实施例一种数据计算的过程的链路示意图;
50.图3为本发明实施例一种数据全链路示意图;
51.图4为本发明实施例一种实时计算任务的业务流程图;
52.图5为本发明实施例一种数据全链路追踪分析方法的应用场景示意图;
53.图6为本发明实施例一种采集数据记录实现示意图;
54.图7为本发明实施例一种异步处理消息示意图;
55.图8为本发明实施例一种计算点的数据轨迹记录示意图;
56.图9为本发明实施例一种计算过程的记录触发点示意图;
57.图10为本发明实施例一种数据高效写入过程示意图;
58.图11为本发明实施例一种索引构建示意图;
59.图12为本发明实施例一种问题数据集发现示意图;
60.图13为本发明实施例一种数据全链路追踪查询过程示意图。
具体实施方式
61.下面详细描述本发明的实施例,所述实施例的示例在附图中示出,其中自始至终相同或类似的标号表示相同或类似的元件或具有相同或类似功能的元件。下面通过参考附图描述的实施例是示例性的,仅用于解释本发明,而不能理解为对本发明的限制。
62.在本发明的描述中,需要理解的是,涉及到方位描述,例如上、下、前、后、左、右等指示的方位或位置关系为基于附图所示的方位或位置关系,仅是为了便于描述本发明和简化描述,而不是指示或暗示所指的装置或元件必须具有特定的方位、以特定的方位构造和操作,因此不能理解为对本发明的限制。
63.本发明的描述中,除非另有明确的限定,设置、安装、连接等词语应做广义理解,所属技术领域技术人员可以结合技术方案的具体内容合理确定上述词语在本发明中的具体含义。
64.本发明的描述中,参考术语“一个实施例”、“一些实施例”、“示意性实施例”、“示例”、“具体示例”、或“一些示例”等的描述意指结合该实施例或示例描述的具体特征、结构、材料或者特点包含于本发明的至少一个实施例或示例中。在本说明书中,对上述术语的示意性表述不一定指的是相同的实施例或示例。而且,描述的具体特征、结构、材料或者特点可以在任何的一个或多个实施例或示例中以合适的方式结合。
65.相关技术中,大数据平台的实时数据的采集计算通常会经历多个环节,当出现问题的时候,往往在某一个环节很难定位与分析问题,需要从采集源、消息中间件、计算程序到目标库的全数据计算链路环节中去查找问题出现的原因,并且由于数据是实时流动的,也比较难固定某一个时间段来核对数据的一致性。目前查找问题的方法主要存在以下缺陷:
66.第一点、实时计算任务运行状态判断复杂。由于计算的任务是由多个环节组成,任何一个环节出现异常,都会导致任务失败,如果只获取计算环节的状态,会导致看到的运行状态与实际的运行情况不一致,无法及时发现运行时的问题。
67.第二点、数据丢失难发现。由于实时计算会做一些数据实时清洗、加工之类的操作,这样在实时入库后的数据与采集时数据会存在一致性核对困难的问题,不能确定是做了过滤操作,还是数据丢失,导致数据不一致。
68.第三点、数据稽核难。由于实时数据是流式过来的,时刻都有数据在流动,读取数据的时间与数据入库的时间总是存一些时间差,这样对给定时间段的数据总量的对比,变得很困难,不能通过总量是否一致来确认数据是否准确,也无法准确评估是否存在数据丢失的情况。
69.第四点、多源数据联合实时计算时,数据全链路难绘制。由于计算过程中会有数据聚合、合并之类的操作,如无跟踪数据计算的过程数据,便无法完整地绘制数据链路,无法明确数据丢失是计算过滤还是异常情况。
70.第五点、数据分析难。在大数据量、高并发、多环节的计算过程中,一些错误通常发生在某些小的环节,基本不会导致计算任务停止,但是会导致当前的部分数据丢失,此时没有全链路的数据跟踪分析,无法快速定位到实时流程的哪个环节出了问题,更无法提供数据丢失问题的根因分析。
71.通过上述背景描述可知,通过在多个源端的数据采集、消息存储、数据计算、数据入库等多个环节,其中入库端存在多种不同类型的介质,例如文件存储ftp、hdfs,数据库存储hive、mysql等,存在的环节越多,出现问题的概率越大,丢数据的机会也越多。因此,如何跟踪数据运行轨迹,使得在数据运行过程中一旦出现问题,可以快速根据数据全链路分析数据的运行轨迹,从而找到数据问题的根本原因是当前继续解决的问题。示例性地,以如下场景为例:
72.环节1:数据采集4条记录:
73.采集数据a:{user:a1,age:23,degree:bachelor,salary:3000,city:hn}
74.采集数据b:{user:b1,age:32,degree:master,salary:5000:city:hn}
75.采集数据c:{user:c1,age:22,degree:bachelor,salary:4000:city:hn}
76.采集数据d:{user:d1,age:24,degree:bachelor,salary:3000:city:gz}
77.环节2:根据age《30来过滤数据可得:
78.采集数据a:{user:a1,age:23,degree:bachelor,salary:3000,city:hn}
79.采集数据c:{user:c1,age:22,degree:bachelor,salary:4000:city:hn}
80.采集数据d:{user:d1,age:24,degree:bachelor,salary:3000:city:gz}
81.环节3:计算,avg(salary),group by city可得:
82.数据:{city:hn,avg:3500},{city:gz,avg:3000}
83.环节4:入库数据(数据丢失)后可得:
84.数据:{city:hn,avg:3500}
85.从以上场景例子可知存在过滤的数据、计算合并的数据、丢失的数据,如果没有数据的全链路追踪分析,要查找数据的问题,当前做法主要有:
86.第一、通过查看业务逻辑代码的实现过程,查看日志中关键信息“age》=30”的数据,找到采集数据b:{user:b1,age:32,degree:master,salary:5000:city:hn},确认此条数据是过滤的数据。
87.第二、查看日志分析,并根据计算的group条件,发现“采集数据a”与“采集数据c”进行了计算合并,确认这两条数据是参与聚合计算,没有丢失数据。
88.第三、查看日志,应该合并生成两条数据,但只入库了一条数据,通过日志分析确认“{city:gz,avg:3000}”数据丢失。
89.第四、通过日志分析,需要业务实现过程事先记录日志,如果没有详细的日志记录,便无法通过日志分析数据的轨迹,实时数据核对便无法进行,同时分析日志也需要耗费比较长的时间,不便于快速实现实时任务的数据核对,无法有效找到数据问题根因。
90.综上可知,实时计算任务通过采集、消息存储、计算与入库的环节时,其数据在这些环节流转过程中,也发生了相应的变化,有可能丢失了数据、过滤了数据、计算合并了数据,对每条数据发生情况,都需要有明确的依据,能够证明业务逻辑计算过程没有发生数据丢失的情况,同时也可以快速找到并分析出数据计算的过程,可以方便快速对数据问题进行根因分析。
91.基于此,参照图1,本发明实施例提供了一种数据全链路追踪分析方法,本发明实施例的方法可应用于大数据处理平台对应的处理器、服务器或云端。其中,大数据处理平台可以用于处理网络通信过程的各个环节生成的数据、也可以用于处理多个设备之间的交互数据。
92.可以理解的是,本发明实施例的方法在应用过程中,包括但不限于以下步骤:
93.步骤s110、记录多源采集数据的第一轨迹,根据第一轨迹生成图数据库的第一节点数据;
94.在本实施例中,多源采集数据是指通过多个方式进行数据采集或者在多个节点进行数据采集。本实施例可以通过将多源采集数据写入到消息中间件后,通过异步多线程方式从消息中间件中读取第一轨迹上的多源采集数据,并将读取到的多源采集数据作为图数据库的第一节点数据。其中,消息中间件用于存储多源采集数据以及多源采集数据的第一轨迹。并且,在通过异步多线程方式从消息中间件中读取第一轨迹上的多源采集数据时,可以通过实时监听消息中间件的消息;当监听到消息中间件中存在满足第一预设要求的消息进入时,获取满足第一预设要求的消息及对应的位移点,然后根据位移点,通过异步多线程方式读取满足第一预设要求的消息。其中,多个位移点组成第一轨迹。满足第一预设要求的消息是指在当前节点下存储到消息中间件内的消息。
95.可以理解的是,本实施例为提升多源数据采集的吞吐量,采用数据采集与实时计算解耦的模式,将多源的采集数据先存储到消息中间件中,然后从消息中间件读取消息进行计算。其中,消息中间件可以保障存储的数据在一定时间内的重复计算,为数据完整性提供有力的保障。
96.在数据写入到消息中间件的同时,通过异步机制再将数据写一份到dgraph(图数据库),图数据库用来存储采集数据的轨迹。在本实施例中,由于此阶段是数据最初形态,不需要计算数据间的关联关系,只需要存储图数据库中的节点数据。其中,每条数据按节点数据格式进行存储,其存储格式为:p(表名称:英文名称{主键字段:值,字段名:值,...}。
97.由于采集阶段数据记录量较大,在记录数据的轨迹时,即要保障数据正常的处理不受影响,又要可以记录数据的轨迹,所以本实施例采用异步写入数据的机制,利用消息中间件可以重复消费的特点,配置一个特定的消费组,例如g_pc_graph,通过此消费组异步多线程读取数据,并将数据按图数据的格式存储为图数据的节点数据。
98.步骤s120、记录实时计算数据的第二轨迹;
99.在本实施例中,可以通过先获取实时计算数据中的源数据后,对源数据作聚合计算,得到聚合数据,接着计算源数据和聚合数据的关联关系作为第二关联关系,根据第二关联关系生成实时计算数据的第二轨迹。可以理解的是,数据流经到此处,会发生数据合并、过滤、聚合等之类的计算操作,此时数据间会产生某些关联关系。在本实施例中,聚合包括数据的加法、减法、平均值、并集、交集等处理方式。以如下的数据为例:
100.数据a:{user:a1,age:23,degree:bachelor,salary:3000,city:hn}
101.数据b:{user:b1,age:25,degree:master,salary:5000:city:hn}
102.如图2所示,对上述数据a与数据b做数据的聚合计算,avg(age)group by city(即对数据a和数据b中的年龄求均值,并按照城市类型进行分组),会产生新的数据c:{age:24,city:hn},在完成数据计算时候,通过获取算子参与计算的数据a与数据b,同时截取计算的算子为avg与group by的操作,最后计算完成结果后,截取最终的数据c为:顶点p{聚合:tt{age:24,city:hn}}。再通过快速查找算法,定位到数据a、数据b与数据c,根据算子的数据流向,建立数据a
‑‑
》数据c、数据b
‑‑
》数据c的关联关系,其关系的值为“平均聚合计算”,通过建立点与点之间的边的关联关系,从而记录计算过程的数据轨迹作为第二轨迹。
103.步骤s130、记录入库结果的第三轨迹,根据第三轨迹生成所述图数据库的第二节点数据后,并计算第一节点数据与第二节点数据的第一关联关系;
104.在本实施例中,一个计算任务在计算完成后,会将数据落盘。在数据落盘时,也形成了全链路的最后的节点数据,记录图数据库的顶点数据:p(目标表名称:英文名称{主键字段:值,字段名:值,...},再记录点与上游节点的关联关系(第一关联关系),即边的关联关系v。
105.步骤s140、根据第一节点数据、第二节点数据、第二轨迹和第一关联关系生成全链路图;
106.在本实施例中,在得到全链路图后,利用分布式图数据库的能力,可以快速基于数据属性、主键与类型等值定位到数据,根据数据可以展示数据的全链路图,可以清晰看见这些数据是否被合并,还是入库。同时提供快速查询出一段时间内未入库的所有记录,以match(匹配)查询来查询数据为例,查询的语句如下:
107.match(x:user)where x.username=
‘
a1’return x;//根据指定的条件查询定位到数据
108.match(x)-[r:聚合算子]-(y)where r.关联='聚合'return x,r,y;//根据数据、以及数据之间的关联关系来查询链路数据。
[0109]
在本实施例中,还可以通过对数据的全链路分析,利用查询语句查询指定时间段内未到达最后节点的数据,形成问题数据集的展示能力,其查询语句如下所示:
[0110]
match path=((start{n:采集数据})-[r:all]-(!end{e:入库数据}))//查询从采集数据到未正常入库的所有数据,其中!end表示未到最后节点的数据
[0111]
with start,end,path
[0112]
where start_time=》st and end_time《=et and task_id=?
[0113]
return collect(start,r,end);
[0114]
步骤s150、根全链路图对数据全链路进行分析并展示。
[0115]
在本实施例中,可以根据记录的图数据的关联关系,生成全链路追踪分析结果图,同时对在大数据量下的数据链路采用异步加载的方式,逐层加载链路的下层数据,可以根据数据链路分析的结果,发现数据流转过程,利于对数据的分析,生成的分析结果如图3所示的数据全链路展示图,从图3中的链路分析展示可以明显看到,“数据d”是被过滤的数据。
[0116]
示例性地,以客户管理系统为例,客户关系管理系统每天都产生大量的受理订单数据、订单竣工数据、产品实例数据,从这些实时的数据流中生成一张宽表,包括产品实例、订单竣工日期、产品类型信息,要实现此业务的实时计算任务,如图4所示,大致逻辑为:从来源数据流“受理订单数据流”、“订单竣工数据流”、“产品实例数据流”以及维表数据“产品类型数据”中过滤、数据补全、数据合并,形成“产品实例数据”后,将产品实例数据入库到目标库中。
[0117]
以上场景业务过程中每天有超过百万的受理订单消息与竣工消息数据,以及产品实例消息数据,在实时流转与计算过程很容易产生数据丢失等各种不同情况的异常,为对计算与同步过程的数据进行跟踪,可快速分析全数据的链路,需要记录数据的轨迹,实现步骤如下:
[0118]
步骤一、获取来源数据也即采集数据。通过增加采集时的监控器可以监控到采集到kafka中的数据流,在监控器中实现将数据解析入库到图数据中的作用。
[0119]
步骤二、数据计算环节:在过滤、合并的算子中增加拦截器,实时进行监听:
[0120]
(1)算子前的数据即输入数据,通过异步的方式入库到图数据中。
[0121]
(2)记录过滤后的数据到图数据中,同时记录输入数据与过滤数据的关联关系为“过滤”。
[0122]
(3)记录参与计算合并的数据到图数据中,同时记录输入数据与参与计算的关联关系为“合并”。
[0123]
(4)记录算子算出的数据到图数据中。
[0124]
步骤三、入库环节:通过对入库的算子增加拦截器,实现输入数据拦截与记录、输出数据拦截与记录到图数据中,同时记录输入与入库数据的关联关系为“入库”。
[0125]
步骤四、通过查询语句可以很容易在上千万的数据中秒级查询出数据全链路过程,秒级分析出未正常入库的问题数据,如下的语句可以在秒级内实现数据从采集到入库以及是否过滤的整个过程的全链路展示。
[0126]
match(prod:产品实例)where prod.product_id=
‘
1000100’return prod;//查数据
[0127]
match(prod)-[r:过滤/合并]-(y)where r.关联=’过滤’or’合并’return prod,
r,y;//查询数据关联关系
[0128]
match path=((start{n:采集数据})-[r:all]-(!end{e:入库数据}))//查询问题数据。
[0129]
基于上述内容可知,本实施例解决实时任务的计算过程中,数据流转经过多个环节时,通过跟踪与记录数据,从而形成全链路的数据轨迹,针对数据的轨迹可以进行数据的全链路分析,发现数据运行与计算过程中的问题,从而有利于分析数据被使用情况,协助快速找到数据是否存在丢失、异常等问题。同时由于实时流转的数据量大、并发多,需要能够快速地记录数据轨迹并不对计算任务产生影响,支撑大并发任务实时高效写入;数据量大对查询展示的性能是一个比较大的挑战,要实现秒级内可以针对亿级大数据量的数据全链路的血缘展示,有效提升分析数据、查找定位问题的效率。
[0130]
在一些实施例中,将本实施例的数据全链路追踪分析方法应用于如图5所示场景时,包括但不限于以下步骤:
[0131]
步骤s510、采集数据的轨迹记录。
[0132]
本实施例实时计算任务通过对日志等数据的采集,将数据存储到kafka(消息中间件中),通过异步多线程的程序来消费采集的数据,消费时通过配置特定的消费组如:g_pc_graph,然后将数据存储到图数据库中。具体地,异步多线程的程序实现逻辑如图6所示,包括但不限于以下步骤:
[0133]
步骤s610、利用flink cdc技术将数据库的日志数据采集kafka消息中间件,根据表名称进行hash分布,确保一个表的数据只同步到一个分区中,计算方式如下:
[0134]
分区号=hash(表名称)%总分区数
[0135]
步骤s620、采集程序通过异步多线程的方式对应topic的分区,其实现方式:
[0136]
(1)通过flink分布式计算框架实时监听消息中间件的消息。
[0137]
onaynsconsumerlistener{
[0138]
//有新消息时会触发消费处理
[0139]
onlistener(message msg){
[0140]
//此处开启多线程异步处理逻辑的实现
[0141]
}
[0142]
}
[0143]
(2)当有新的消息进来会触发监听的程序,由程序获取消息及位移点(offsetid)。
[0144]
(3)开启异步处理消息的程序,让消息处理不影响到采集程序。其中,异步消息的过程如图7所示,生成了一个2^8的环形队列,写入数据保留一个指标,读取数据保留指标,读与写相互隔离,读取按环的顺序位置(读位置=读位置%2^8)读取消息数据后清空数据;写数据先在环上找位置(写位置=写位置%2^8)后,再将数据写入到此位置对应的区域上。其中,“%”是指将其他变量置入字符串特定位置以生成新字符串的操作。
[0145]
步骤s630、轨迹记录存储。
[0146]
经过上步处理的消息,再通过格式转换,可直接存储图数据库的顶点数据,并且不需要记录数据的关联关系,形式如下:
[0147]
create(数据对象名1:采集数据{主键:1,属性:xx,...}//创建点的数据并保存到图数据库中。
[0148]
步骤s520、实时计算的数据轨迹记录。
[0149]
由于实时计算的任务,会对数据进行过滤、聚合等之类的计算操作,数据会在中途被合并或丢失之类的,在此过程需要对数据计算过程的轨迹进行完整的记录,确保对每条数据可追踪查看,记录实现步骤如图8所示,包括但不限于以下步骤:
[0150]
步骤s810、在计算点增加对数据的跟踪切入点,当数据流到此算子时,同时触发数据的计算轨迹记录的动作,通过aop切面的思维,分别触发在数据输入时、计算时、数据输出时触发记录的动作,如图9所示,包括但不限于以下步骤:
[0151]
步骤s910、通过切面机制,针对所有算子配置切面的方式为:pointcut(*action),对所有action结尾的算子均执行切面配置的三个方法。
[0152]
步骤s920、设计切面的实现,增加计算点前触发点方法beforeaction,可以记录输入的数据;增加计算中的方法actioning,可以记录计算中参与计算的数据;增加计算后的方法afteraction,可以记录输出的数据。
[0153]
步骤s820、修改算子的实现方法名称,将需要追踪记录的算子方法的名称后缀加上action,如sumaction(...),表示求和的算子,为确保前端使用方式不改变,修改为:sum(...){sumaction(...)},在原有的sum方法调用sumaction,然后在sumaction中实现原有sum的内容。
[0154]
步骤s830、通过beforeaction方法记录进入计算点前的数据,记录的数据主要包括:数据体(表、字段、主键的元数据以及对应的值),以及与上级算子的关联关系,实现方式如下步骤:
[0155]
步骤s831、创建输入数据:create point(数据对象名1:输入数据1{主键:1,属性:xx,...})//创建点输入数据并保存。
[0156]
步骤s832、创建输入数据与上级算子的关联关系,如下:
[0157]
create edge(采集数据1:rela-》采集关系:输出数据1)//创建上级采集数据与下游输入数据的关联关系,并标明关联关系的类型。
[0158]
步骤s840、通过actioning方法记录计算中的数据,给参与计算的数据打标签。
[0159]
本实施例的实现过程包括但不限于以下步骤:
[0160]
步骤s841、create point(数据对象名1:计算数据1{主键:1,属性:xx...,算子名称:sum})//创建顶点数据,并记录数据对应的算子;
[0161]
步骤s842、创建算子,给每次参与计算的算子产生一个唯一标识:
[0162]
create point(算子名称 唯一标识:计算算子{主键:标识,名称:xx,计算时间:...,其他的属性})//创建算子的节点,并保存;
[0163]
步骤s843、创建算子与参与计算的数据节点的关联关系:
[0164]
create vertex(计算数据1:rela-》计算算子:算子唯一标识)//创建了参与计算的数据与算子之间的关联关系。
[0165]
步骤s850、通过afteraction方法记录计算后的数据,将计算后的数据保存到图数据中。本实施例的实现过程包括先不限于以下步骤:
[0166]
步骤s851、create point(数据对象名1:输出数据1{主键:1,属性:xx...,记录时间:xx})//创建输出数据,并保存;
[0167]
步骤s852、记录输出与上游算子的关联关系:
[0168]
create edge(输出数据1:rela-》计算算子:算子唯一标识)//创建了输出的数据与算子之间的关联关系。
[0169]
步骤s860、数据写入性能保障。由于实时计算要求性能高、响应快,为确保记录数据过程不影响性能,在写入频繁并且并发较大的时候,如图10所示,增加异步队列的机制,具体包括但不限于以下步骤:
[0170]
配置队列。根据数据多少的需求来配置好队列的长度,如queue:2(队列长度),queue:10(队列长度)。
[0171]
写入队列的数据。除了数据,还包括执行的语句,这样在读取的时候可以直接调用执行,减少了解释与拼装的时间。
[0172]
在处理队列的数据时,程序会根据资源情况、队列的数据堆积多少自动开启并发数或配置最大与最小的并发来处理堆积数据,保障可以高效的写入图数据库中。
[0173]
读取到的数据,通过对写入端进行适当的缓存,将原本单条写入的操作变成批量操作,以提升写入的效率。
[0174]
步骤s530、入库的结果数据轨迹记录。
[0175]
本实施例入库是实时计算任务的最后一个环节,也就是sink算子,需要增加pointcut(*sink)的拦截器,在数据流经此处时,只需要增加一个aftersink的拦截环节即可,记录已经入库的记录数据,操作步骤包括但不限于以下步骤:
[0176]
步骤s531、配置增加对sink操作的拦截,增加监听的配置pointcut(*sink),可以监听所有后缀为sink的方法。
[0177]
步骤s532、修改入库方法,为保障入库的逻辑与上下游调用不变,原方法依然保留,增加xxxsink结尾的方法,并将入库的逻辑在xxxsink中实现,例如write(...){wirtesink(...)},将write的逻辑在wirtesink实现。
[0178]
步骤s533、数据写入主要写入点的数据与边的数据,实现方式如下:
[0179]
create point(数据:入库数据{主键:标识,名称:xx,入库时间:...,其他的属性,})//创建入库点的数据;
[0180]
create edge(上游数据1:rela-》sink:当前数据1)//创建入库数据与上游输出的结果数据的关联关系。
[0181]
步骤s534、写入性能提升,采用步骤s520中的步骤s860的异步队列机制来保障写入的性能,同时也不影响正常实时计算流程的性能。
[0182]
步骤s540、实现大数量下的数据快速查询。
[0183]
本实施例为实现大数据量下数据快速查询,对图数据存储进行适当调整。具体可以通过生成全链路图中每个数据的全局唯一识别码(全局唯一id),然后根据全局唯一识别码对全链路图进行分片存储,并根据全链路图中的数据类型设置索引方式。可以理解的是,本实施例的实现过程包括但不限于以下步骤:
[0184]
步骤s541、为每个数据生成一个uid,由于所有的数据都按着边和节点来存储,构成了一个有向图,图为每个数据对象分配一个全局唯一的id,称为uid,uid是一个64位无符号整数,从1开始单调递增。
[0185]
步骤s542、为快速查询数据的关联关系,会将关联关系按边的数据进行分片存储,对相同的边形成一个分片,每一分片数据压缩成一个key-value对。其中key是为边的uid
值,value是一个称为点的集合的数据结构,当边对应的关联较多时,其分片也适当自动进行拆分,主要拆分key的值当前边的uid_n(表示从1递增的数字),如下所示:
[0186]
key=《边的uid》value《点的uid,点的uid,点的uid,...》
[0187]
key1=《边的uid_1》value《点的uid,点的uid,点的uid,...》
[0188]
key2=《边的uid_n》value《点的uid,点的uid,点的uid,...》
[0189]
步骤s543、拆分的依据,针对key-value中value对其做了整数压缩优化,每256个uids组成一个value。
[0190]
步骤s544、实现快速的关联关系查询,改进过的存储方式非常有利于连接和遍历,一个边遍历只需要一个kv查询。例如,找到x的所有关联数据,只需要用《边x的uid》以及《边x的uid_n》当做key进行查询,就能获得点数据的集合,包含了所有关联数据的uids;寻找x和y的公共关联数据,只需要查询《边x的uid》以及《边x的uid_n》和《边y的uid》以及《边y的uid_n》的点的集合,然后求两者的交集,判断查询uid_n是否终止,只需要根据n从1开始是否可以获取value的值,如果value为空则可以终止查询。
[0191]
步骤s545、为方便、快速实现大数据量的数据查询,不同的数据类型建立相应的索引等方式,例如对于string类型,支持正则表达式、fulltext、term、exact和hash索引;对于datetime类型,支持按年、月、日、小时的索引;对于geo类型,支持nearby、within等索引,如图11所示,构建索引的步骤如下:
[0192]
(1)索引跟数据一样,以key-value的形式存储。
[0193]
(2)key索引的key是《边,token》,token是索引的分词器,从value中获取,例如hash索引生成的token就是hash函数所计算的hash值。
[0194]
(3)对同一个数据可以创建一个或多个分词器来产生tokens。更新的时候,首先从旧值的tokens的values中删除相应的uid,然后把uid添加到新产生的tokens的values里。
[0195]
从图11可知,由于key1建立了fulltext索引,所以会调用fulltext分词器,由key1的值"running fast"得到run和fast两个tokens,它们分别和key1组成两个key,然后把uid 0xa添加到各自的values里。这样,一个值的索引就转化为后端存储的kvs。
[0196]
步骤s546、通过上述的改造,将大量的数据进行拆分,并按图的结构存储,通过索引、节点等可以快速找到数据,其关联的数据根据key值可快速定位到一批相关的数据,实现了快速的关联关系的构建,减少了关系型或nosql下关联关系查询,提升大数据量下的数据快速查询的性能。
[0197]
步骤s550、问题数据的自动发现展示。
[0198]
本实施例可以根据时间口径对每个采集的数据设置采集时间标签,然后根据采集到的数据对应的处理序号生成计算过程图,再根据采集时间标签和所述计算过程图确定异常数据集。可以理解的是,由于所有的数据都存储到图数据库中,并且保存了数据之间的关联关系,对于没有最终入库或没有参与计算的数据,可以通过查找算法自动从图数据库发现某一时间段内入库的数据问题,其中时间以统一的“采集时间标签”为准,操作步骤如图12所示,包括但不限于以下步骤:
[0199]
步骤s551、明确统计的时间口径,以数据采集时创建的时间为准,后续所有流转的数据均会加上“采集时间”,并查看采集算子的输出是否存在异常数据。
[0200]
步骤s552、由于每个算子都有加上输入的数据集、输出的数据集、参与计算的数据
集,按计算过程的序号,列举出中间所有算子的计算过程图。
[0201]
步骤s553、除第一个采集与最后一个入库的算子外,采用并发与排队机制对每个算子进行问题数据的统计计算,公式为:问题数据=(输入数据集-参与计算的数据集)u输出异常数据集。本步骤中的“u”是指求两边数据的并集。
[0202]
步骤s554、最后一个入库算子的问题数据统计公式为:问题数据=输入数据集-入库数据集。
[0203]
步骤s555、最后将每个算子计算出来的问题数据进行合并,形成最终的计算任务的问题数据集,即:算子1的问题数据集u算子2的问题数据集u...u入库算子的问题数据集。
[0204]
步骤s560、数据全链路追踪分析展示。
[0205]
本实施例通过根据查询数据在全链路图查询得到目标数据;然后根据目标数据在全链路图中获取上下游数据后,获取上下游数据与目标数据的第三关联关系;再根据上下游数据、第三关联关系和异常数据集生成数据追踪分析图,对数据追踪分析图进行展示。其中,展示过程可以是人机交互界面进行展示。可以理解的是,在实时任务的计算与数据同步任务过程中,数据的计算过程、输入、输出与异常的数据均有记录,要实现数据的全链路追踪分析,可以从任一个数据点开始向下与向下追踪分析数据链路,具体的操作步骤如图13所示,包括但不限于以下步骤:
[0206]
步骤s561、输入查询的数据,可以用数据类型、数据主键以及其他的属性,从图数据库查询出想要的数据,如果有多条数据,可多选或单选其中一条数据。
[0207]
步骤s562、对选中的数据可快速查询其关联的上下游数据,并查询出来数据之间的关联关系(第三关联关系)。
[0208]
步骤s563、然后分别根据上下游的数据继续查询相关的数据及关联关系的数据。
[0209]
步骤s564、重复步骤s563,直到追踪到向下的数据为入库的数据、向上的数据为采集的数据为止。
[0210]
步骤s565、根据查询的数据绘制出数据追踪分析图。
[0211]
由上述示例过程可知,本实施例的实施过程及其效果可以总结如下:
[0212]
第一点、通过多源大数据量的采集,利用无锁式的异步数据处理框架来记录采集的数据轨迹,并存储到图数据库中。
[0213]
第二点、利用切面编程技术,在实时计算的每个算子中增加事前、事中、事后的处理方法,实时跟踪记录每个算子的流入数据、处理中的数据、处理完成的数据到图数据库。
[0214]
第三点、跟踪记录每个数据流入的算子、流出的算子,来记录数据加工计算过程的关联关系,形成数据流向的链路图。
[0215]
第四点、记录数据入库环节,以及流入库的数据与入库数据的关联关系到图。
[0216]
第五点、提供数据快速查询定位能力,从而可以快速从大量数据中绘制出数据的加工过程图,便于快速发现与定位数据的问题。
[0217]
第六点、在多源数据联合实时计算时,通过记录的数据轨迹,从而可以快速绘制出数据的聚合、合并,过滤等经过的算子的过程图,便于快速定位数据走向,帮助分析问题。
[0218]
本发明实施例提供了一种与图1方法对应的数据全链路追踪分析装置,包括:
[0219]
第一模块,用于记录多源采集数据的第一轨迹,并根据第一轨迹生成图数据库的第一节点数据;
[0220]
第二模块,用于记录实时计算数据的第二轨迹;
[0221]
第三模块,用于记录入库结果的第三轨迹,并根据第三轨迹生成所述图数据库的第二节点数据后,计算第一节点数据与第二节点数据的第一关联关系;
[0222]
第四模块,用于根据第一节点数据、第二节点数据、第二轨迹和第一关联关系生成全链路图;
[0223]
第五模块,用于根据全链路图对数据全链路进行分析并展示。
[0224]
本发明方法实施例的内容均适用于本装置实施例,本装置实施例所具体实现的功能与上述方法实施例相同,并且达到的有益效果与上述方法达到的有益效果也相同。
[0225]
本发明实施例提供了一种电子设备,包括:
[0226]
至少一个存储器,用于存储程序;
[0227]
至少一个处理器,用于加载所述程序以执行图1所示的数据全链路追踪分析方法。
[0228]
本发明方法实施例的内容均适用于本电子设备实施例,本电子设备实施例所具体实现的功能与上述方法实施例相同,并且达到的有益效果与上述方法达到的有益效果也相同。
[0229]
本发明实施例提供了一种计算机存储介质,其中存储有计算机可执行的程序,所述计算机可执行的程序被处理器执行时用于实现图1所示的数据全链路追踪分析方法。
[0230]
本发明方法实施例的内容均适用于本存储介质实施例,本存储介质实施例所具体实现的功能与上述方法实施例相同,并且达到的有益效果与上述方法达到的有益效果也相同。
[0231]
此外,本发明实施例还提供了一种计算机程序产品或计算机程序,该计算机程序产品或计算机程序包括计算机指令,该计算机指令存储在计算机可读存介质中。计算机设备的处理器可以从计算机可读存储介质读取该计算机指令,处理器执行该计算机指令,使得该计算机设备执行图1所示的数据全链路追踪分析方法。
[0232]
上面结合附图对本发明实施例作了详细说明,但是本发明不限于上述实施例,在所属技术领域普通技术人员所具备的知识范围内,还可以在不脱离本发明宗旨的前提下作出各种变化。此外,在不冲突的情况下,本发明的实施例及实施例中的特征可以相互组合。