使用 MySQL 自定义 Apache ShardingSphere 高可用性

了解如何以及为什么 ShardingSphere 可以使用 MySQL 作为示例来实现数据库高可用性。
1 位读者喜欢这篇文章。
Business woman on laptop sitting in front of window

图片来源:Mapbox Uncharted ERG, CC-BY 3.0 US

用户有很多选项可以自定义和扩展 ShardingSphere 的高可用性 (HA) 解决方案。我们的团队已经完成了两个 HA 计划:一个基于 MGR 的 MySQL 高可用性解决方案,以及一些社区提交者贡献的 openGauss 数据库高可用性解决方案。 这两个解决方案的原理是相同的。

以下是如何以及为什么 ShardingSphere 可以使用 MySQL 作为示例来实现数据库高可用性

ShardingSphere high availability components

(Zhao Jinchao, CC BY-SA 4.0)

先决条件

ShardingSphere 通过执行以下 SQL 语句来检查底层 MySQL 集群环境是否准备就绪。 如果任何测试失败,ShardingSphere 将无法启动。

检查是否安装了 MGR

SELECT * FROM information_schema.PLUGINS WHERE PLUGIN_NAME='group_replication'

查看 MGR 组成员数量。 底层 MGR 集群应至少包含三个节点

SELECT count(*) FROM performance_schema.replication_group_members

检查 MGR 集群的组名是否与配置中的组名一致。 组名是 MGR 组的标记,MGR 集群的每个组只有一个组名

SELECT * FROM performance_schema.global_variables WHERE VARIABLE_NAME='group_replication_group_name' 

检查当前 MGR 是否设置为单主模式。 目前,ShardingSphere 不支持双写或多写场景。 它仅支持单写模式

SELECT * FROM performance_schema.global_variables WHERE VARIABLE_NAME='group_replication_single_primary_mode'

查询 MGR 组集群中的所有节点主机、端口和状态,以检查配置的数据源是否正确

SELECT MEMBER_HOST, MEMBER_PORT, MEMBER_STATE FROM performance_schema.replication_group_members

动态主数据库发现

ShardingSphere 根据 MySQL 提供的查询主数据库 SQL 命令查找主数据库 URL

private String findPrimaryDataSourceURL(final Map<String, DataSource> dataSourceMap) {
    String result = "";
    String sql = "SELECT MEMBER_HOST, MEMBER_PORT FROM performance_schema.replication_group_members WHERE MEMBER_ID = "
            + "(SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'group_replication_primary_member')";
    for (DataSource each : dataSourceMap.values()) {
        try (Connection connection = each.getConnection();
             Statement statement = connection.createStatement();
             ResultSet resultSet = statement.executeQuery(sql)) {
            if (resultSet.next()) {
                return String.format("%s:%s", resultSet.getString("MEMBER_HOST"), resultSet.getString("MEMBER_PORT"));
            }
        } catch (final SQLException ex) {
            log.error("An exception occurred while find primary data source url", ex);
        }
    }
    return result;
}

将上述找到的主数据库 URL 与配置的 dataSources URL 逐一进行比较。 匹配的数据源是主数据库。 它将被更新到当前的 ShardingSphere 内存,并永久保存到注册中心,并通过该注册中心分发到集群中的其他计算节点。

registry center

(Zhao Jinchao, CC BY-SA 4.0)

动态备用数据库发现

ShardingSphere 中有两种类型的备用数据库状态:启用和禁用。 备用数据库状态将同步到 ShardingSphere 内存,以确保可以正确路由读取流量。

获取 MGR 组中的所有节点

SELECT MEMBER_HOST, MEMBER_PORT, MEMBER_STATE FROM performance_schema.replication_group_members

禁用备用数据库

private void determineDisabledDataSource(final String schemaName, final Map<String, DataSource> activeDataSourceMap,
                                         final List<String> memberDataSourceURLs, final Map<String, String> dataSourceURLs) {
    for (Entry<String, DataSource> entry : activeDataSourceMap.entrySet()) {
        boolean disable = true;
        String url = null;
        try (Connection connection = entry.getValue().getConnection()) {
            url = connection.getMetaData().getURL();
            for (String each : memberDataSourceURLs) {
                if (null != url && url.contains(each)) {
                    disable = false;
                    break;
                }
            }
        } catch (final SQLException ex) {
            log.error("An exception occurred while find data source urls", ex);
        }
        if (disable) {
            ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(schemaName, entry.getKey(), true));
        } else if (!url.isEmpty()) {
            dataSourceURLs.put(entry.getKey(), url);
        }
    }
}

是否禁用备用数据库取决于配置的数据源和 MGR 组中的所有节点。

ShardingSphere 可以逐一检查配置的数据源是否可以正确获取 Connection,并验证数据源 URL 是否包含 MGR 组的节点。

如果无法获取 Connection 或验证失败,ShardingSphere 将通过事件触发器禁用数据源,并将其同步到注册中心。

启用备用数据库

private void determineEnabledDataSource(final Map<String, DataSource> dataSourceMap, final String schemaName,
                                        final List<String> memberDataSourceURLs, final Map<String, String> dataSourceURLs) {
    for (String each : memberDataSourceURLs) {
        boolean enable = true;
        for (Entry<String, String> entry : dataSourceURLs.entrySet()) {
            if (entry.getValue().contains(each)) {
                enable = false;
                break;
            }
        }
        if (!enable) {
            continue;
        }
        for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
            String url;
            try (Connection connection = entry.getValue().getConnection()) {
                url = connection.getMetaData().getURL();
                if (null != url && url.contains(each)) {
                    ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(schemaName, entry.getKey(), false));
                    break;
                }
            } catch (final SQLException ex) {
                log.error("An exception occurred while find enable data source urls", ex);
            }
        }
    }
}

崩溃的备用数据库恢复并添加到 MGR 组后,将检查配置以查看是否使用了恢复的数据源。 如果是,事件触发器将告诉 ShardingSphere 需要启用该数据源。

心跳机制

HA 模块引入了心跳机制,以确保主备状态实时同步。

通过集成 ShardingSphere 子项目 ElasticJob,当 HA 模块初始化时,上述流程由 ElasticJob 调度框架以 Job 的形式执行,从而实现功能开发和作业调度的分离。

即使开发人员需要扩展 HA 功能,他们也不需要关心如何开发和操作作业

private void initHeartBeatJobs(final String schemaName, final Map<String, DataSource> dataSourceMap) {
    Optional<ModeScheduleContext> modeScheduleContext = ModeScheduleContextFactory.getInstance().get();
    if (modeScheduleContext.isPresent()) {
        for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {
            Map<String, DataSource> dataSources = dataSourceMap.entrySet().stream().filter(dataSource -> !entry.getValue().getDisabledDataSourceNames().contains(dataSource.getKey()))
                    .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
            CronJob job = new CronJob(entry.getValue().getDatabaseDiscoveryType().getType() + "-" + entry.getValue().getGroupName(),
                each -> new HeartbeatJob(schemaName, dataSources, entry.getValue().getGroupName(), entry.getValue().getDatabaseDiscoveryType(), entry.getValue().getDisabledDataSourceNames())
                            .execute(null), entry.getValue().getHeartbeatProps().getProperty("keep-alive-cron"));
            modeScheduleContext.get().startCronJob(job);
        }
    }
}

总结

到目前为止,Apache ShardingSphere 的 HA 功能已被证明适用于 MySQL 和 openGauss HA 解决方案。 未来,它将集成更多 MySQL HA 产品并支持更多数据库 HA 解决方案。

与往常一样,如果您有兴趣,非常欢迎您加入我们并为 Apache ShardingSphere 项目做出贡献。

Apache ShardingSphere 开源项目链接

标签

评论已关闭。

Creative Commons License本作品采用 Creative Commons Attribution-Share Alike 4.0 International License 许可。
© . All rights reserved.