目录
1、效果预览
2、KBQA介绍
3、KBQA实现
3.1、问答系统设计
3.2、使用python链接Fuseki
3.2、分词实现
3.2.1、实体词处理
3.2.2、分词逻辑的实现
3.3、查询实现
3.3.1、单实体查询
3.3.2、多实体查询
4、业务逻辑的整合实现
5、一些补充
6、参考
本篇紧随之前的七篇文章,讲述了建立了知识图谱后,不希望仅仅在可视化和抽象推理的方向上得到应用,同时扩充考虑将其应用在问答系统方向;本篇属于一时兴起,写了一个简单的问答系统,通过使用自然语言处理技术,解析匹配用户问题意图,完成后生成查询模板,在知识库里进行检索,从而反馈最佳答案。
在代码逻辑实现上,考虑到工时资源等信息,因此没有完全按照网上KBQA最常见的REfO的精准匹配去写实现,也没有使用DL技术做模型,而是使用了传统的相似度计算做模糊匹配,因此代码量少很多,算是本篇的特色,应该网上找不到和我代码一样的版本。
代码依赖:
python : 3.6+,(库:SPARQL、jieba、macropodus)
fuseki + jena (同本系列的版本)
人物问答预览:
电影问答预览:
单实体的条件问答:
多实体问答:
KBQA:Knowledge Base Question Answer,由于我们将知识提取并生成了结构化数据,建立了知识库,因此在应用方向上,通过自然语言问答,通过对问题的解析和推理,利用知识库进行问答查询和分析,从而得到答案,这就是KBQA。
详细可以阅读,这位博主在知识普及上写的更好:https://zhuanlan.zhihu.com/p/25735572。
整体代码工程后续回传到Github上,地址后续补充,项目结构简单定义为:
上图是这个系统的简单架构示意,其中底层是数据层,主要是使用了TDB三元数据库,存储知识数据;倒数第二层是通信层,主要是依赖Jena+Fuseki的框架,包括规则推理机的配置等;倒数第三层是应用实现层,主要包括Fuseki的客户端实现,NLP自然语言处理,查询模板的生成应用实现等;最顶层是业务层,主要是对用户输入做提取,结果做解析展示的业务交互层,通过这四层的定义,实现我们的整体项目。
仅仅只有架构,是不足以完善我们的想法的,因此,我们基于某种产品化的考虑,考虑设计完善的业务流程,从用户接入使用,到数据解析,分析以及反馈系统,从而形成一个较为完整的闭环应用体系,因此,设计如下业务框架:
如上图所展示,考虑到一个完整的问答系统,必定不能百分百考虑到所有情况,因此,我们需要在业务处理上,考虑当遇到一些疑难杂症的时候,数据应该流转到哪些新模块去做处理,这样才有可能在未来不断地提高KBQA系统的正确率,上图便考虑增加新词发现,新问题发现来帮助系统完善应用广度,增加意图理解来完善深度,而考虑到需要增加KG和模板,那么就需要使用人工加机器二合一的答案判别和数据分析系统模块,这样逐渐形成一个完善的生产系统。我这里仅实现黄色部分,右边的没空实现,不过可以给大家提供一个不错的思路。
利用SPARQLWrapper的接口实现对feseki的调用,测试和使用前记得启动fuseki的服务器。
代码文件fuseki_query.py内容如下:
from SPARQLWrapper import SPARQLWrapper,JSON class Fuseki: def __init__(self,url ='http://localhost:3030/kg_movie/query'): self.sparql = SPARQLWrapper(url) print('sparql init successful ...') def query(self, query_str): self.sparql.setQuery(query_str) self.sparql.setReturnFormat(JSON) result = self.sparql.query().convert() return result def query_str_build(self,str): pass def test(): fuseki = Fuseki() query_str = r''' PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> PREFIX : <http://www.kg_movie.com#> SELECT * WHERE { ?x rdf:type :Comedian. ?x :actor_chName ?n. } LIMIT 10 ''' query_str = r''' PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> PREFIX : <http://www.kg_movie.com#> SELECT * WHERE { ?x :actor_chName '周星驰'. ?x :actor_bio ?n. } LIMIT 10 ''' res = fuseki.query(query_str) print(res) if __name__ == '__main__': test()
核心在class Fuseki这里,后续会被调用到。
首先要导出存储在mysql中的实体词,导出其中的movie和actor两张表的chName列,可以导出成txt:
导出后效果如下:
需要导出这两个文本的原因:
考虑到对用户话语进行关键词提取和分词,对于我们最关心的热点词,担心分词效果不好,因此将这两个实体数据提取出来,进行特殊词性标注,这样再分词环节就很容易提取出常用热点词。
我们对这两个文件进行词性标注,打上对应的词性,有关词性的解释,可以看jieba的链接:
jieba库的GitHub地址
这里分别使用到人名词性nr,专用词性nz。
处理actor.txt和movie.txt文件,代码文件process_word_dict.py:
from pprint import pprint def append_words_in_lines(file_name,append_str): n_dict = [] with open(file_name+'.txt', mode='r+' ,encoding='utf-8') as f: res = f.readlines() # pprint(res) for word in res: n_dict.append(word[:-1]+append_str) with open(file_name+"_dict.txt", mode='w+', encoding='utf-8') as f: for word in n_dict: f.write(word+'\n') # print(n_dict) if __name__ == '__main__': append_words_in_lines('actor', ' nr') append_words_in_lines('movie', ' nz')
运行一下即可,完成后生成actor_dict.txt和movie_dict.txt文件,内容如下图:
分词我们调用了国内非常有名的开源库,jieba库,这里对齐进行了简单封装,根据我们的需求,做以下步骤:
A.常用词字典的加载,加载actor_dict.txt和movie_dict.txt;
B.将一些常见连续词按照需求给强制切分开,例如,“喜剧演员”会有很大几率是一个词,我们电影图谱,“喜剧”和“演员”分别都是对应关键实体的属性和实体类型,因此需要强制切分开,当然,有一些情况可以不切分,这个需要用业务逻辑来判断。
C.分词和词性标注,不但将词且分开,也要把词性记录下来。
整体代码如下文件nlp.py:
# encoding=utf-8 import jieba import jieba.posseg as pseg from pprint import pprint class TextureProcess: def __init__(self): self.jieba = jieba self.pseg = pseg # 加载词典 def load_dicts(self, *path): for p in path: self.jieba.load_userdict(p) # 强制分词 def force_split(self,*group_list): for group in group_list: self.jieba.suggest_freq((group[0], group[1]), True) def sentence_split_tag(self,sentence): split_list = [] cut_sen = self.pseg.cut(sentence) for word, root in cut_sen: split_list.append({"word":word, "root":root}) return split_list if __name__ == '__main__': text_process = TextureProcess() text_process.load_dicts("actor_dict.txt", "movie_dict.txt") text_process.force_split(['喜剧', '演员'], ['出生', '日期']) text_res = text_process.sentence_split_tag("周星驰的个人简介")
查询的时候,参考了网上很多KBQA的实现,基本上最常见的是两种,一种是基于模板规则,使用正则表达式的精准匹配方式,另外一种是基于ML或DL训练处理的有监督模型,结合我们的图谱大小和数据现状,显然使用ML或DL训练不是很好,而且还需要对数据进行标注处理,这并不符合短时间内实现一个KBQA系统的初衷,因此考虑使用模板匹配,但仔细观看,模板匹配使用了正则表达,每种问题不同问法和相近词都必须考虑到,这并不符合我的想法,因此想要使用模糊匹配解决问题。
因此核心的考虑方案是,定义问题模板,有且仅对一种问题类型做一个示例,无论用户换几种问法,将其问句和示例进行相似度计算,相似度最高的认定为用户想要问的问题方向,如果相似度极低,那么可以简单认为要么是问题刁钻,要么是没考虑到这个问题需要再补充问题示例。
这里提及一下,无论使用传统方法,还是基于深度学习的方法,对用户问题的判断必定是基于某种规则的,例如,如果你没有定义过天上的白云有什么形状?这种问题,自然就没有对应的答案,无论分词还是意图提取,断句以及词性做的多准确,都没有用,因此只有你定义过天上的白云的答案,当用户问题是:万米深空的棉花糖都是啥样,机器才有可能回答“心型、圆形等”,至于这种刁钻问题,如何匹配,例如,万米深空对应天上,棉花糖对应白云,这种问题传统算法匹配成功率较低,较好的办法是交给基于大数据训练的深度网络,才有可能提高匹配正确率。
首先对单实体做一个类的定义:
class SingleEntityRule: def __init__(self, entity_root, segment, table_name): self.entity_root = entity_root self.segment = segment self.table_name = table_name
这段代码在temp_match.py中,属于部分片段。
其中entity_root是实体的词性,segment是用户的问题,table_name是我们查询对应的数据库表名。
接下来考虑问题模板规则的定义:
单实体匹配主要考虑到简单问答的匹配,如:周星驰的个人简介,刘德华的出生日期等,唐伯虎点秋香的演员有哪些?等等。
考虑到单实体还会有一些扩展补充问法,如:周星驰演过的电影的类型有哪些?周星驰合作过的导演有哪些?因此也做了实现,区别是需要查询的数据表是连续的。
具体定义如下,将每种问题定义一种问法,然后放到一个list里:
SingleEntityRules = [ SingleEntityRule(entity_root='nr', segment="的个人介绍?", table_name='bio'), SingleEntityRule(entity_root='nr', segment="的中文名叫什么?", table_name='chName'), SingleEntityRule(entity_root='nr', segment="的英文名叫什么?", table_name='foreName'), SingleEntityRule(entity_root='nr', segment="的国籍是什么?", table_name='nationality'), SingleEntityRule(entity_root='nr', segment="的星座是什么?", table_name='constellation'), SingleEntityRule(entity_root='nr', segment="的出生地点在哪里?", table_name='birthplace'), SingleEntityRule(entity_root='nr', segment="的生日是哪天?", table_name='birthday'), SingleEntityRule(entity_root='nr', segment="演过哪些电影?", table_name='repWorks'), SingleEntityRule(entity_root='nr', segment="获得了哪些成就?", table_name='achiem'), SingleEntityRule(entity_root='nr', segment="的经济公司?", table_name='brokerage'), SingleEntityRule(entity_root='nz', segment="的电影简介?", table_name='bio'), SingleEntityRule(entity_root='nz', segment="的中文名叫什么?", table_name='chName'), SingleEntityRule(entity_root='nz', segment="的英文名叫什么?", table_name='foreName'), SingleEntityRule(entity_root='nz', segment="的上映时间是什么时候?", table_name='prodTime'), SingleEntityRule(entity_root='nz', segment="的制片公司是哪家?", table_name='prodCompany'), SingleEntityRule(entity_root='nz', segment="的导演是谁?", table_name='director'), SingleEntityRule(entity_root='nz', segment="的编剧是谁?", table_name='screenwriter'), SingleEntityRule(entity_root='nz', segment="属于什么类型的电影?", table_name='genre'), SingleEntityRule(entity_root='nz', segment="的参演演员有哪些?", table_name='star'), SingleEntityRule(entity_root='nz', segment="的电影时长是多少?", table_name='length'), SingleEntityRule(entity_root='nz', segment="的上映时间是哪天?", table_name='releaseTime'), SingleEntityRule(entity_root='nz', segment="的语言类型是哪种?", table_name='language'), SingleEntityRule(entity_root='nz', segment="获得了哪些成就?", table_name='achiem'), # 实体 相关 实体(无定向) 的属性 SingleEntityRule(entity_root='nr', segment="演过的电影的类型是哪些?", table_name='chName&genre'), SingleEntityRule(entity_root='nr', segment="合作过的导演有哪些?", table_name='chName&director') ]
这段代码在temp_match.py中,属于部分片段。
规则定义完后,需要对规则进行查询匹配,大体的思路是,我们对用户的问题进行处理,然后遍历计算用户的问题,判断问题与问题模板里哪一个相似度最高,得分最高的则认为是用户想要了解的问题,这里我们使用了macropodus库做相似度计算,默认是使用jaccard相似度系数,非常方便。
但获取到用户正确的问题后,在对问题进行查询模板对照生成,由于我们生产的数据实体只有两种,一个是电影,一个是明星,因此,单实体只需要考虑这两种情况nr和nz,这样可被用于fuseki查询的query语句就完成了,具体代码如下:
class BaseQueryTemp: def __init__(self): self.query_head = r''' PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> PREFIX : <http://www.kg_movie.com#> SELECT * WHERE {''' self.query_end = r'''} LIMIT 10''' class QuerySETemp(BaseQueryTemp): def __init__(self): super().__init__() @staticmethod def _calc(sentence): max_score = 0 table_name = "" fin_entity_root = "" for SingleEntityRule in SingleEntityRules: score = macropodus.sim(sentence, SingleEntityRule.entity_root + SingleEntityRule.segment) if score > max_score: max_score = score fin_entity_root = SingleEntityRule.entity_root table_name = SingleEntityRule.table_name return fin_entity_root, table_name def get_query_temp(self, entity_word_list, sentence): entity_root, table_name = self._calc(sentence) core_query = "" if entity_root == "nz": core_query = r''' ?x :movie_chName '{0}'. ?x :movie_{1} ?res_o. '''.format(entity_word_list[0]['nz'], table_name) elif entity_root == "nr": if "&" in table_name : table_list = table_name.split("&") from pprint import pprint pprint(table_list) core_query = r''' ?x :actor_chName '{0}'. ?x :hasActedIn ?o. ?o :movie_{1} ?res_0. ?o :movie_{2} ?res_1. '''.format(entity_word_list[0]['nr'], table_list[0], table_list[1]) else: core_query = r''' ?x :actor_chName '{0}'. ?x :actor_{1} ?res_o. '''.format(entity_word_list[0]['nr'], table_name) return self.query_head + core_query + self.query_end
这段代码在temp_match.py中,属于部分片段。
其中 BaseQueryTemp是我们定义了一个基类,直接被继承即可。
QuerySETemp是我们的单实体模板类,其中_calc是计算相似度,get_query_temp是模板生产。
主要考虑的是联合查询,即实体和实体一起才能满足的条件,例如人和人的条件组合,电影和电影的条件组合,人和电影的条件组合,以及自由搭配实体数量的条件组合。
首先对多实体进行定义:
class MultiEntityRule: # type same 表示entity_root 相同,如 [nr nr] # type diff 表示entity_root 不相同,如 [nr nz] def __init__(self, entity_root_list, segment, link_name, table_name, belong, type='same', ): self.type = type self.entity_root_list = entity_root_list self.segment = segment self.link_name = link_name self.table_name = table_name self.belong = belong
在之前的单实体的基础上增加了link_name和belong和type,link_name考虑的是中间查询的节点,belong判断属于那哪种类型,例如是电影还是人物方向,type主要考虑是实体和实体之间的类别是否相同,需要触发不同的调用模板逻辑。
由于多实体我只想到了两种,因此,就定义了两个规则,如下:
# diff类型由于暂时没想到 MultiEntityRules = [ MultiEntityRule(type='same', entity_root_list=["nr"], segment="一起演过什么电影?", link_name=["hasActedIn"], table_name="chName", belong='movie'), MultiEntityRule(type='same', entity_root_list=["nz"], segment="同时参演的演员是谁?", link_name=["hasActor"], table_name="chName", belong='actor'), ]
然后计算相似度,生成模板 :
class QueryMETemp(BaseQueryTemp): def __init__(self): super().__init__() @staticmethod def _calc(type , sentence): max_score = 0 # list_entity_root = [] list_link_name = [] table_name = "" belong = "" for MultiEntityRule in MultiEntityRules: if type == 'same': score = macropodus.sim(sentence, MultiEntityRule.entity_root_list[0] + MultiEntityRule.segment) else: # diff score = macropodus.sim(sentence, MultiEntityRule.entity_root_list[0] + MultiEntityRule.entity_root_list[1] + MultiEntityRule.segment) if score > max_score: max_score = score # list_entity_root = MultiEntityRule.entity_root_list list_link_name = MultiEntityRule.link_name table_name = MultiEntityRule.table_name belong = MultiEntityRule.belong return list_link_name, table_name, belong def get_query_temp(self, entity_word_list, sentence): type = '' if all('nr' == root for root_word in entity_word_list for root in root_word) or all('nz' == root for root_word in entity_word_list for root in root_word): # same type = 'same' else: type = 'diff' entity_num = len(entity_word_list) list_link_name, table_name, belong = self._calc(type, sentence) # 生成模板 entity_query_list = [] mid_query_list = [] for index in range(entity_num): root_word = entity_word_list[index] for root in root_word: if root == 'nr': entity_query_list.append(r''' ?a{0} :actor_chName '{1}'.'''.format(index, root_word[root])) mid_query_list.append(r''' ?a{0} :hasActedIn ?last.'''.format(index)) elif root == 'nz': entity_query_list.append(r''' ?b{0} :movie_chName '{1}'.'''.format(index, root_word[root])) mid_query_list.append(r''' ?b{0} :hasActor ?last.'''.format(index)) else: print("传参类型错误 ... ") end_query = r''' ?last :{0}_{1} ?res_0.'''.format(belong,table_name) core_query = '' for entity_query in entity_query_list: core_query = core_query + entity_query for mid_query in mid_query_list: core_query = core_query + mid_query core_query = core_query + end_query return self.query_head + core_query + self.query_end if __name__ == '__main__': pass
生成模板考虑到有的时候用户可以问超过2个实体单位,因此使用了循环结构来生成,这样可以对应2个以上的实体查询模板额生成。
这里附上单实体和多实体查询模板的完整代码temp_match.py:
import macropodus class BaseQueryTemp: def __init__(self): self.query_head = r''' PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> PREFIX : <http://www.kg_movie.com#> SELECT * WHERE {''' self.query_end = r'''} LIMIT 10''' # Rule # [实体]有什么[实体]? # [实体]的[属性]? # [实体]和[实体]的[关系]? # [实体]的[条件属性]? # class WithoutEntityRule: def __init__(self): pass WithoutEntityRules = [ # WithoutEntityRule(key_word= ,) ] class MultiEntityRule: # type same 表示entity_root 相同,如 [nr nr] # type diff 表示entity_root 不相同,如 [nr nz] def __init__(self, entity_root_list, segment, link_name, table_name, belong, type='same', ): self.type = type self.entity_root_list = entity_root_list self.segment = segment self.link_name = link_name self.table_name = table_name self.belong = belong # diff类型由于暂时没想到 MultiEntityRules = [ MultiEntityRule(type='same', entity_root_list=["nr"], segment="一起演过什么电影?", link_name=["hasActedIn"], table_name="chName", belong='movie'), MultiEntityRule(type='same', entity_root_list=["nz"], segment="同时参演的演员是谁?", link_name=["hasActor"], table_name="chName", belong='actor'), ] class QueryMETemp(BaseQueryTemp): def __init__(self): super().__init__() @staticmethod def _calc(type , sentence): max_score = 0 # list_entity_root = [] list_link_name = [] table_name = "" belong = "" for MultiEntityRule in MultiEntityRules: if type == 'same': score = macropodus.sim(sentence, MultiEntityRule.entity_root_list[0] + MultiEntityRule.segment) else: # diff score = macropodus.sim(sentence, MultiEntityRule.entity_root_list[0] + MultiEntityRule.entity_root_list[1] + MultiEntityRule.segment) if score > max_score: max_score = score # list_entity_root = MultiEntityRule.entity_root_list list_link_name = MultiEntityRule.link_name table_name = MultiEntityRule.table_name belong = MultiEntityRule.belong return list_link_name, table_name, belong def get_query_temp(self, entity_word_list, sentence): type = '' if all('nr' == root for root_word in entity_word_list for root in root_word) or all('nz' == root for root_word in entity_word_list for root in root_word): # same type = 'same' else: type = 'diff' entity_num = len(entity_word_list) list_link_name, table_name, belong = self._calc(type, sentence) # 生成模板 entity_query_list = [] mid_query_list = [] for index in range(entity_num): root_word = entity_word_list[index] for root in root_word: if root == 'nr': entity_query_list.append(r''' ?a{0} :actor_chName '{1}'.'''.format(index, root_word[root])) mid_query_list.append(r''' ?a{0} :hasActedIn ?last.'''.format(index)) elif root == 'nz': entity_query_list.append(r''' ?b{0} :movie_chName '{1}'.'''.format(index, root_word[root])) mid_query_list.append(r''' ?b{0} :hasActor ?last.'''.format(index)) else: print("传参类型错误 ... ") end_query = r''' ?last :{0}_{1} ?res_0.'''.format(belong,table_name) core_query = '' for entity_query in entity_query_list: core_query = core_query + entity_query for mid_query in mid_query_list: core_query = core_query + mid_query core_query = core_query + end_query return self.query_head + core_query + self.query_end class SingleEntityRule: def __init__(self, entity_root, segment, table_name): self.entity_root = entity_root self.segment = segment self.table_name = table_name class QuerySETemp(BaseQueryTemp): def __init__(self): super().__init__() @staticmethod def _calc(sentence): max_score = 0 table_name = "" fin_entity_root = "" for SingleEntityRule in SingleEntityRules: score = macropodus.sim(sentence, SingleEntityRule.entity_root + SingleEntityRule.segment) if score > max_score: max_score = score fin_entity_root = SingleEntityRule.entity_root table_name = SingleEntityRule.table_name return fin_entity_root, table_name def get_query_temp(self, entity_word_list, sentence): entity_root, table_name = self._calc(sentence) core_query = "" if entity_root == "nz": core_query = r''' ?x :movie_chName '{0}'. ?x :movie_{1} ?res_o. '''.format(entity_word_list[0]['nz'], table_name) elif entity_root == "nr": if "&" in table_name : table_list = table_name.split("&") from pprint import pprint pprint(table_list) core_query = r''' ?x :actor_chName '{0}'. ?x :hasActedIn ?o. ?o :movie_{1} ?res_0. ?o :movie_{2} ?res_1. '''.format(entity_word_list[0]['nr'], table_list[0], table_list[1]) else: core_query = r''' ?x :actor_chName '{0}'. ?x :actor_{1} ?res_o. '''.format(entity_word_list[0]['nr'], table_name) return self.query_head + core_query + self.query_end SingleEntityRules = [ SingleEntityRule(entity_root='nr', segment="的个人介绍?", table_name='bio'), SingleEntityRule(entity_root='nr', segment="的中文名叫什么?", table_name='chName'), SingleEntityRule(entity_root='nr', segment="的英文名叫什么?", table_name='foreName'), SingleEntityRule(entity_root='nr', segment="的国籍是什么?", table_name='nationality'), SingleEntityRule(entity_root='nr', segment="的星座是什么?", table_name='constellation'), SingleEntityRule(entity_root='nr', segment="的出生地点在哪里?", table_name='birthplace'), SingleEntityRule(entity_root='nr', segment="的生日是哪天?", table_name='birthday'), SingleEntityRule(entity_root='nr', segment="演过哪些电影?", table_name='repWorks'), SingleEntityRule(entity_root='nr', segment="获得了哪些成就?", table_name='achiem'), SingleEntityRule(entity_root='nr', segment="的经济公司?", table_name='brokerage'), SingleEntityRule(entity_root='nz', segment="的电影简介?", table_name='bio'), SingleEntityRule(entity_root='nz', segment="的中文名叫什么?", table_name='chName'), SingleEntityRule(entity_root='nz', segment="的英文名叫什么?", table_name='foreName'), SingleEntityRule(entity_root='nz', segment="的上映时间是什么时候?", table_name='prodTime'), SingleEntityRule(entity_root='nz', segment="的制片公司是哪家?", table_name='prodCompany'), SingleEntityRule(entity_root='nz', segment="的导演是谁?", table_name='director'), SingleEntityRule(entity_root='nz', segment="的编剧是谁?", table_name='screenwriter'), SingleEntityRule(entity_root='nz', segment="属于什么类型的电影?", table_name='genre'), SingleEntityRule(entity_root='nz', segment="的参演演员有哪些?", table_name='star'), SingleEntityRule(entity_root='nz', segment="的电影时长是多少?", table_name='length'), SingleEntityRule(entity_root='nz', segment="的上映时间是哪天?", table_name='releaseTime'), SingleEntityRule(entity_root='nz', segment="的语言类型是哪种?", table_name='language'), SingleEntityRule(entity_root='nz', segment="获得了哪些成就?", table_name='achiem'), # 实体 相关 实体(无定向) 的属性 SingleEntityRule(entity_root='nr', segment="演过的电影的类型是哪些?", table_name='chName&genre'), SingleEntityRule(entity_root='nr', segment="合作过的导演有哪些?", table_name='chName&director') ] if __name__ == '__main__': pass
调用封装的三个类,业务逻辑的简单实现,不需要额外的赘述了,直接放完整代码。
main_logic.py:
# encoding=utf-8 import jieba import jieba.posseg as pseg from pprint import pprint from KBQA.fuseki_query import * from KBQA.temp_match import * from KBQA.nlp import * if __name__ == '__main__': # cut_sentences("a") # cut_tag('a') # mat() fuseki = Fuseki() query_se_temp = QuerySETemp() query_me_temp = QueryMETemp() text_process = TextureProcess() text_process.load_dicts("actor_dict.txt", "movie_dict.txt") text_process.force_split(['喜剧', '演员'], ['出生', '日期']) # 周星驰的个人简介 print("输入要查询的问题:") while True: in_str = input() if in_str == "exit": break else: text_res = text_process.sentence_split_tag(in_str) segment = "" entity_word_list = [] query_temp = "" for Word in text_res: if Word["root"] == "nr" or Word["root"] == "nz": segment = segment + Word["root"] entity_word_list.append({Word['root']: Word['word']}) else: segment = segment + Word["word"] # print(entity_word_list) if len(entity_word_list) == 0: print("无实体或虚拟实体,》》》转人工域分析 ... ...") continue elif len(entity_word_list) == 1: # print("单实体") query_temp = query_se_temp.get_query_temp(entity_word_list, segment) # print(query_temp) elif len(entity_word_list) >= 2: # print("多实体") query_temp = query_me_temp.get_query_temp(entity_word_list, segment) # print(query_temp) # pprint(res) query_result = fuseki.query(query_temp) # pprint(query_result) parse_dict = parse(query_result) for result_info in parse_dict: if len(parse_dict[result_info]) > 0: print(parse_dict[result_info]) # print(type(query_result)) print("进程结束 ................. ")
本身现在也没有很大的兴趣沉浸在开发里,因此实现思路有了后也只是简单的实现一些基本框架,也没有大的兴趣全部验证一遍,至少个人项目上没那么大精力,但作为产品和架构思想,我想深挖的细节还是有很多,考虑到这段时间对KBQA做了功课,因此整理一下一些信息,对KBQA做了一下几种分类:
方法 | 子方法 | 人力投入 | 资源投入 | 其他说明 |
---|---|---|---|---|
模板匹配 | 精准匹配 | 正则表达实现 | ||
模板匹配 | 模糊匹配 | jaccard、余弦等相似度计算 本次使用的方法更接近模糊匹配 | ||
模板匹配 | 关键词匹配 | 正则、也可以计算相似度 | ||
空间计算 | 向量化问题计算相似,适合Web | |||
语义解析 | 利用词性进行计算 | |||
实体搜索 | 提取实体及关键词,匹配节点 |
以上方法除了精准匹配外,都可以通过深度学习做大规模训练,仅此。
1、相似度计算库macropodus
2、Jaccard系数
3、结巴分词库
4、KBQA实现
5、农业KBQA
6、中文开发知识图谱-REfO实现KBQA