官方所给的IK分词器只支持远程文本文件热更新,不支持采用MySQL热更新,没关系,这难不倒伟大的博主,给哈哈哈。今天就来和大家讲一下如何采用MySQL做热更新IK分词器的词库。
CREATE TABLE `es_extra_main` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键', `word` varchar(255) CHARACTER SET utf8mb4 NOT NULL COMMENT '词', `is_deleted` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否已删除', `update_time` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP (6) COMMENT '更新时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `es_extra_stopword` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键', `word` varchar(255) CHARACTER SET utf8mb4 NOT NULL COMMENT '词', `is_deleted` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否已删除', `update_time` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP (6) COMMENT '更新时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
<!--mysql驱动--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.29</version> </dependency>
这里新增一个关于MySQL的类,源码中有关于远程文本文件的热更新源码,我们这边仿照源码来写一就可以啦。
package org.wltea.analyzer.dic; import org.apache.logging.log4j.Logger; import org.elasticsearch.SpecialPermission; import org.wltea.analyzer.help.ESPluginLoggerFactory; import java.security.AccessController; import java.security.PrivilegedAction; import java.sql.*; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; public class DatabaseMonitor implements Runnable { private static final Logger logger = ESPluginLoggerFactory.getLogger(DatabaseMonitor.class.getName()); public static final String PATH_JDBC_PROPERTIES = "jdbc.properties"; private static final String JDBC_URL = "jdbc.url"; private static final String JDBC_USERNAME = "jdbc.username"; private static final String JDBC_PASSWORD = "jdbc.password"; private static final String JDBC_DRIVER = "jdbc.driver"; private static final String SQL_UPDATE_MAIN_DIC = "jdbc.update.main.dic.sql"; private static final String SQL_UPDATE_STOPWORD = "jdbc.update.stopword.sql"; /** * 更新间隔 */ public final static String JDBC_UPDATE_INTERVAL = "jdbc.update.interval"; private static final Timestamp DEFAULT_LAST_UPDATE = Timestamp.valueOf(LocalDateTime.of(LocalDate.of(2020, 1, 1), LocalTime.MIN)); private static Timestamp lastUpdateTimeOfMainDic = null; private static Timestamp lastUpdateTimeOfStopword = null; public String getUrl() { return Dictionary.getSingleton().getProperty(JDBC_URL); } public String getUsername() { return Dictionary.getSingleton().getProperty(JDBC_USERNAME); } public String getPassword() { return Dictionary.getSingleton().getProperty(JDBC_PASSWORD); } public String getDriver() { return Dictionary.getSingleton().getProperty(JDBC_DRIVER); } public String getUpdateMainDicSql() { return Dictionary.getSingleton().getProperty(SQL_UPDATE_MAIN_DIC); } public String getUpdateStopwordSql() { return Dictionary.getSingleton().getProperty(SQL_UPDATE_STOPWORD); } /** * 加载MySQL驱动 */ public DatabaseMonitor() { SpecialPermission.check(); AccessController.doPrivileged((PrivilegedAction<Void>) () -> { try { Class.forName(getDriver()); } catch (ClassNotFoundException e) { logger.error("mysql jdbc driver not found", e); } return null; }); } @Override public void run() { SpecialPermission.check(); AccessController.doPrivileged((PrivilegedAction<Void>) () -> { Connection conn = getConnection(); // 更新主词典 updateMainDic(conn); // 更新停用词 updateStopword(conn); closeConnection(conn); return null; }); } public Connection getConnection() { Connection connection = null; try { connection = DriverManager.getConnection(getUrl(), getUsername(), getPassword()); } catch (SQLException e) { logger.error("failed to get connection", e); } return connection; } public void closeConnection(Connection conn) { if (conn != null) { try { conn.close(); } catch (SQLException e) { logger.error("failed to close Connection", e); } } } public void closeRsAndPs(ResultSet rs, PreparedStatement ps) { if (rs != null) { try { rs.close(); } catch (SQLException e) { logger.error("failed to close ResultSet", e); } } if (ps != null) { try { ps.close(); } catch (SQLException e) { logger.error("failed to close PreparedStatement", e); } } } /** * 主词典 */ public synchronized void updateMainDic(Connection conn) { logger.info("start update main dic"); int numberOfAddWords = 0; int numberOfDisableWords = 0; PreparedStatement ps = null; ResultSet rs = null; try { String sql = getUpdateMainDicSql(); Timestamp param = lastUpdateTimeOfMainDic == null ? DEFAULT_LAST_UPDATE : lastUpdateTimeOfMainDic; logger.info("param: " + param); ps = conn.prepareStatement(sql); ps.setTimestamp(1, param); rs = ps.executeQuery(); while (rs.next()) { String word = rs.getString("word"); word = word.trim(); if (word.isEmpty()) { continue; } lastUpdateTimeOfMainDic = rs.getTimestamp("update_time"); if (rs.getBoolean("is_deleted")) { logger.info("[main dic] disable word: {}", word); // 删除 Dictionary.disableWord(word); numberOfDisableWords++; } else { logger.info("[main dic] add word: {}", word); // 添加 Dictionary.addWord(word); numberOfAddWords++; } } logger.info("end update main dic -> addWord: {}, disableWord: {}", numberOfAddWords, numberOfDisableWords); } catch (SQLException e) { logger.error("failed to update main_dic", e); // 关闭 ResultSet、PreparedStatement closeRsAndPs(rs, ps); } } /** * 停用词 */ public synchronized void updateStopword(Connection conn) { logger.info("start update stopword"); int numberOfAddWords = 0; int numberOfDisableWords = 0; PreparedStatement ps = null; ResultSet rs = null; try { String sql = getUpdateStopwordSql(); Timestamp param = lastUpdateTimeOfStopword == null ? DEFAULT_LAST_UPDATE : lastUpdateTimeOfStopword; logger.info("param: " + param); ps = conn.prepareStatement(sql); ps.setTimestamp(1, param); rs = ps.executeQuery(); while (rs.next()) { String word = rs.getString("word"); word = word.trim(); if (word.isEmpty()) { continue; } lastUpdateTimeOfStopword = rs.getTimestamp("update_time"); if (rs.getBoolean("is_deleted")) { logger.info("[stopword] disable word: {}", word); // 删除 Dictionary.disableStopword(word); numberOfDisableWords++; } else { logger.info("[stopword] add word: {}", word); // 添加 Dictionary.addStopword(word); numberOfAddWords++; } } logger.info("end update stopword -> addWord: {}, disableWord: {}", numberOfAddWords, numberOfDisableWords); } catch (SQLException e) { logger.error("failed to update main_dic", e); } finally { // 关闭 ResultSet、PreparedStatement closeRsAndPs(rs, ps); } } }
初始化方法中新增加载JDBC的方法和将getProperty改为public
并且在Dictionary类后面新增下面的方法
/** * 加载新词条 */ public static void addWord(String word) { singleton._MainDict.fillSegment(word.trim().toLowerCase().toCharArray()); } /** * 移除(屏蔽)词条 */ public static void disableWord(String word) { singleton._MainDict.disableSegment(word.trim().toLowerCase().toCharArray()); } /** * 加载新停用词 */ public static void addStopword(String word) { singleton._StopWords.fillSegment(word.trim().toLowerCase().toCharArray()); } /** * 移除(屏蔽)停用词 */ public static void disableStopword(String word) { singleton._StopWords.disableSegment(word.trim().toLowerCase().toCharArray()); } /** * 加载 jdbc.properties */ public void loadJdbcProperties() { Path file = PathUtils.get(getDictRoot(), DatabaseMonitor.PATH_JDBC_PROPERTIES); try { props.load(new FileInputStream(file.toFile())); logger.info("====================================properties===================================="); for (Map.Entry<Object, Object> entry : props.entrySet()) { logger.info("{}: {}", entry.getKey(), entry.getValue()); } logger.info("====================================properties===================================="); } catch (IOException e) { logger.error("failed to read file: " + DatabaseMonitor.PATH_JDBC_PROPERTIES, e); } }
grant { // needed because of the hot reload functionality permission java.net.SocketPermission "*", "connect,resolve"; permission java.lang.RuntimePermission "setContextClassLoader"; };
将MySQL的jar包依赖加入进来,否则打包会缺少jar包保持错。
<include>mysql:mysql-connector-java</include>
打包成zip文件,然后加压成文件夹
将解压的文件夹放到ES的plugins目录下,然后配置一下config目录下的数据库配置信息,最后再重启一下ES即可完成安装。
在数据库表中中新增下面自己的想要的关键词,然后去Kibana中做测试验证,可以发现已经可以啦。
关键词
停止词
POST _analyze { "text": ["俺是熊二呗"], "analyzer": "ik_max_word" }
运行结果