2021SC@SDUSC
OSSIM-agent源代码分析(四)
OSSIM-agent-config.py下
OSSIM Agent的主要职责是收集网络上存在的各种设备发送的所有数据,然后按照一种标准方式(standardized way)有序的发送给OSSIM Server,Agent收集到数据后在发送给Server之前要对这些数据进行标准化处理,这样Server就可以依一种统一的方式来处理这些信息,并且也简化了Server的处理过程
agent的配置文件是config.py,作为大量数据的标准化处理位置,配置文件的对各种数据、文件的处理方式的
在plugin.cfg文件中显示,各种常量的设置
_NEEDED_CONFIG_ENTRIES = { 'config': ['type', 'source', 'enable'] } _EXIT_IF_MALFORMED_CONFIG = False TRANSLATION_SECTION = 'translation' TRANSLATION_FUNCTION = 'translate' TRANSLATION_DEFAULT = '_DEFAULT_' SECTIONS_NOT_RULES = ["config", "info", TRANSLATION_SECTION] CONCAT_FUNCTION = 'CONCAT' _MAP_REPLACE_TRANSLATIONS = 4 _MAP_REPLACE_USER_FUNCTIONS = 8 _MAP_REPLACE_CUSTOM_USER_FUNCTIONS = 16 _MAP_REPLACE_CONCAT = 32 #00100000 _MAP_REPLACE_TRANSLATE2 = 64 #1000000
检查所选部分是否为可以的翻译部分
__regexIsTranslationSection = re.compile("translation-\S+", re.UNICODE) __regex_check_for_translate2_function = re.compile("\{translate2\((?P<variable>\$[^\)]+),(?P<translate_section>\$[^\)]+)\)\}", re.UNICODE)
设置配置、翻译等具体的规则
def rules(self): rules = {} for section in sorted(self.sections()): regexp = self.get(section,"regexp") if self.get("config","source") == "log" and (regexp is None or regexp == ""): continue if Plugin.__regexIsTranslationSection.match(section.lower()) is not None: if section.lower() not in Plugin.SECTIONS_NOT_RULES: logger.info("Loading new translation section... %s" % section.lower()) Plugin.SECTIONS_NOT_RULES.append(section.lower()) if section.lower() not in Plugin.SECTIONS_NOT_RULES: rules[section] = self.hitems(section,True) return rules
通过正则表达式和位置变换进行变量替换
def _replace_array_variables(self, value, groups): for i in range(2): search = re.findall("\{\$[^\}\{]+\}", value, re.UNICODE) rvalue = value if search != []: for string in search: var = string[2:-1] var_position = 0 try: var_position= int(var) if var_position < len(groups): rvalue = rvalue.replace(string, str(groups[var_position])) except ValueError,e: rvalue = value return rvalue
替换变量、用户功能和自定义用户功能
def get_replace_array_value(self, value, groups): rvalue = "" rvalue = self._replace_array_variables(value, groups) if rvalue==value: rvalue = self._replace_user_array_functions(value, groups) if rvalue == value: rvalue = self._replace_custom_user_function_array(value,groups) return rvalue
使用位置参数替换自定义用户函数,并且检查所有变量是否有替换,以及调用函数在ParserUtil.py
def _replace_custom_user_function_array(self,value,groups): search = re.findall("(\{:(\w+)\((\$[^\)]+)?\)\})", value, re.UNICODE) if search != []: for string in search: (string_matched, func, variables) = string vars = split_variables(variables) for var in vars: var_pos =0 try: var_pos = int(var) except TypeError: logger.warning("Can not replace '%s'" % var) return value if var_pos >= len(groups): logger.warning("Can not replace '%s' " % var) return value func_name = "%s_%s" % (func,self.get("DEFAULT", "plugin_id")) if func_name != Plugin.TRANSLATION_FUNCTION and \ hasattr(Plugin, func_name): args = ",".join(["groups[%s]"%s for s in vars]) try: cmd = "value = value.replace(string_matched, str(self.%s(%s)))" %(func_name,args) exec cmd except TypeError, e: logger.error(e) else: logger.warning( "Function '%s' is not implemented" % (func)) value = value.replace(string_matched, str(groups[var])) return value
替换位置函数,检查所有变量是否有替换,然后调用ParserUtil.py函数、getattr等函数执行替换功能
def _replace_user_array_functions(self, value, groups): if value is None: return None search = re.findall("(\{(\w+)\((\$[^\)]+)\)\})", value, re.UNICODE) if search != []: for string in search: (string_matched, func, variables) = string vars = split_variables(variables) for var in vars: if len(groups) - 1 < int(var): return value if func != Plugin.TRANSLATION_FUNCTION and \ hasattr(ParserUtil, func): f = getattr(ParserUtil, func) args = "" for v in vars: args += "str(groups[%s])," % (str(v)) try: exec "value = value.replace(string_matched," + \ "str(f(" + args + ")))" except TypeError, e: logger.error(e) else: logger.debug("Tranlation fucntion...") for v in vars: . if int(v) > (len(groups) - 1): logger.debug("Error var:%d, is greatter than groups size. (%d)" % (int(v), len(groups))) else: var_value = groups[int(v)] if self.has_section(Plugin.TRANSLATION_SECTION): if self.has_option(Plugin.TRANSLATION_SECTION, var_value): value = self.get(Plugin.TRANSLATION_SECTION, var_value) return value
通过正则表达式,查找配置参数中的_CFG(section,option)值,并将其替换为全局配置文件中的值
def replace_config(self, conf): for section in sorted(self.sections()): for option in self.options(section): regexp = self.get(section, option) search = re.findall("(\\\\_CFG\(([\w-]+),([\w-]+)\))", regexp, re.UNICODE) if search != []: for string in search: (all, arg1, arg2) = string if conf.has_option(arg1, arg2): value = conf.get(arg1, arg2) regexp = regexp.replace(all, value) self.set(section, option, regexp)
总规则下,regexp条目中替换处理每一个反斜线
def replace_aliases(self, aliases) for rule in self.rules().iterkeys(): regexp = self.get(rule, 'regexp') search = re.findall("\\\\\w\w+", regexp, re.UNICODE) if search != []: for string in search: repl = string[1:] if aliases.has_option("regexp", repl): value = aliases.get("regexp", repl) regexp = regexp.replace(string, value) self.set(rule, "regexp", regexp)
从get_replace_value()调用,通过正则替换变量
def _replace_variables(self, value, groups, rounds=2): for i in range(rounds): search = re.findall("\{\$[^\}\{]+\}", value, re.UNICODE) if search != []: for string in search: var = string[2:-1] if groups.has_key(var): value = value.replace(string, str(groups[var])) return value
确定替换变量是否成功,如果失败,则跳过get_replace_value()
def _replace_variables_assess(self, value): ret = 0 for i in range(2): search = re.findall("\{\$[^\}\{]+\}", value, re.UNICODE) if search != []: ret = i return ret
通过搜索字符串方式,替换指定插件及其对应插件
def _replace_translations(self, value, groups): regexp = "(\{(" + Plugin.TRANSLATION_FUNCTION + ")\(\$([^\)]+)\)\})" search = re.findall(regexp, value, re.UNICODE) if search != []: for string in search: (string_matched, func, var) = string if groups.has_key(var): if self.has_section(Plugin.TRANSLATION_SECTION): if self.has_option(Plugin.TRANSLATION_SECTION, groups[var]): value = self.get(Plugin.TRANSLATION_SECTION, groups[var]) else: logger.warning("Can not translate '%s' value" % \ (groups[var])) if self.has_option(Plugin.TRANSLATION_SECTION, Plugin.TRANSLATION_DEFAULT): value = self.get(Plugin.TRANSLATION_SECTION, Plugin.TRANSLATION_DEFAULT) else: value = groups[var] else: logger.warning("There is no translation section") value = groups[var] return value
同上,确定替换变量是否成功,如果失败,则跳过get_replace_value()
def _replace_translations_assess(self, value): regexp = "(\{(" + Plugin.TRANSLATION_FUNCTION + ")\(\$([^\)]+)\)\})" search = re.findall(regexp, value, re.UNICODE) if search != []: return self._MAP_REPLACE_TRANSLATIONS return 0
函数被指定为{f($v)},并且从get_replace_value()调用,执行替换功能
def _replace_user_functions(self, value, groups): search = re.findall("(\{(\w+)\((\$[^\)]+)\)\})", value, re.UNICODE) if search != []: for string in search: (string_matched, func, variables) = string vars = split_variables(variables) for var in vars: if not groups.has_key(var): logger.warning("Can not replace '%s'" % (value)) return value if func != Plugin.TRANSLATION_FUNCTION and \ hasattr(ParserUtil, func): f = getattr(ParserUtil, func) args = "" for i in (range(len(vars))): args += "groups[vars[%s]]," % (str(i)) try: cmd = "value = value.replace(string_matched," + \ "str(f(" + args + ")))" exec cmd except TypeError, e: logger.error(e) else: logger.warning( "Function '%s' is not implemented" % (func)) value = value.replace(string_matched, \ str(groups[var])) return value
同上,确定替换变量是否成功,如果失败,则跳过get_replace_value()
def _replace_user_functions_assess(self, value): search = re.findall("(\{(\w+)\((\$[^\)]+)\)\})", value, re.UNICODE) if search != []: return self._MAP_REPLACE_USER_FUNCTIONS return 0
检查所有变量是否有替换,如果没有一直替换到全部替换为止
def _replace_custom_user_functions(self,value,groups): search = re.findall("(\{:(\w+)\((\$[^\)]+)?\)\})", value, re.UNICODE) if search != []: for string in search: (string_matched, func, variables) = string vars = split_variables(variables) for var in vars: if not groups.has_key(var): logger.warning("Can not replace '%s'" % (var)) return value func_name = "%s_%s" % (func,self.get("DEFAULT", "plugin_id")) if func_name != Plugin.TRANSLATION_FUNCTION and \ hasattr(Plugin, func_name): args = "" for i in (range(len(vars))): args += "groups[vars[%s]]," % (str(i)) try: cmd = "value = value.replace(string_matched, str(self.%s(%s)))" %(func_name,args) exec cmd except TypeError, e: logger.error(e) else: logger.warning( "Function '%s' is not implemented" % (func)) value = value.replace(string_matched, \ str(groups[var])) return value
替换数组变量连接的值
def __replaceConcatFunction(self,value,groups): concat ="" m = re.match("\$CONCAT\((?P<params>.*)\)",value) if m: mdict = m.groupdict() if mdict.has_key('params'): paramlist = mdict['params'].split(',') for param in paramlist: if param.startswith('$'):#assume variable if groups.has_key(param[1:]): concat+=groups[param[1:]] else: concat+=param else: concat+=param return concat
检查是否有必要进行连接函数替换
def __checkReplaceConcatFunction(self,value): ret = 0 if re.match("\$CONCAT\((.*)\)",value): ret = self._MAP_REPLACE_CONCAT return ret
替换传递过来的对象中给定的值
def __replace_translate2_function(self,value,groups): replaced_value ="" m = self.__regex_check_for_translate2_function.match(value) if m: mdict = m.groupdict() if 'variable' in mdict and 'translate_section' in mdict: variable = mdict['variable'].replace('$','') translate_section = mdict['translate_section'].replace('$','') if variable in groups: if self.has_section(translate_section): if self.has_option(translate_section,groups[variable]): replaced_value = self.get(translate_section, groups[variable]) return replaced_value else: logger.warning("Translate2 variable(%s) not found on the translate section %s" % (groups[variable],translate_section)) else: logger.warning("Translate2 translation_section(%s) not found" % translate_section) replaced_value = value
检查是否需要换translate2
def __check_for_translate2_function(self,value): """Check whether we need to make a translate2 replacement or not. @param value: the string in the rule """ ret = 0 if Plugin.__regex_check_for_translate2_function.match(value) is not None: ret = self._MAP_REPLACE_TRANSLATE2 return ret
替换concat函数、自定义用户函数、用户函数、翻译、变量等
def get_replace_value(self, value, groups, replace=15): if replace > 0: if replace & self._MAP_REPLACE_TRANSLATE2: value = self.__replace_translate2_function(value, groups) if replace & 3: value = self._replace_variables(value, groups, (replace & 3)) if replace & 4: value = self._replace_translations(value, groups) if replace & 8: value = self._replace_user_functions(value, groups) if replace & self._MAP_REPLACE_CUSTOM_USER_FUNCTIONS: value = self._replace_custom_user_functions(value,groups) if replace & self._MAP_REPLACE_CONCAT: value = self.__replaceConcatFunction(value,groups) return value
类CommandLineOptions,用于配置各种数据
class CommandLineOptions: def __init__(self): self.__options = None parser = OptionParser( usage="%prog [-v] [-q] [-d] [-f] [-g] [-c config_file]", version="OSSIM (Open Source Security Information Management) " + \ "- Agent ") parser.add_option("-v", "--verbose", dest="verbose", action="count", help="verbose mode, makes lot of noise") parser.add_option("-d", "--daemon", dest="daemon", action="store_true", help="Run agent in daemon mode") parser.add_option("-f", "--force", dest="force", action="store_true", help="Force startup overriding pidfile") parser.add_option("-s", "--stats", dest="stats", type='choice', choices=['all', 'clients', 'plugins'], default=None, help="Get stats about the agent") parser.add_option("-c", "--config", dest="config_file", action="store", help="read config from FILE", metavar="FILE") (self.__options, args) = parser.parse_args() if len(args) > 1: parser.error("incorrect number of arguments") if self.__options.verbose and self.__options.daemon: parser.error("incompatible options -v -d") def get_options(self): return self.__options
进行数组等变量创建
def split_variables(string): return re.findall("(?:\$([^,\s]+))+", string) def split_sids(string, separator=','): list = list_tmp = [] list = string.split(separator) for sid in list: a = sid.split('-') if len(a) == 2: list.remove(sid) for i in range(int(a[0]), int(a[1]) + 1): list_tmp.append(str(i)) list.extend(list_tmp) return list