ウェブサイト検索

MySQL を使用して Apache ShardingSphere の高可用性をカスタマイズする


MySQL を例として使用して、ShardingSphere がデータベースの高可用性を実現する方法とその理由を学びましょう。

ユーザーには、ShardingSphere の高可用性 (HA) ソリューションをカスタマイズおよび拡張するための多くのオプションがあります。私たちのチームは 2 つの HA 計画を完了しました。MGR に基づく MySQL 高可用性ソリューションと、一部のコミュニティ コミッターによって提供された openGauss データベース高可用性ソリューションです。 2 つのソリューションの原理は同じです。

以下は、例として MySQL を使用して ShardingSphere がデータベースの高可用性を実現する方法と理由を示しています。

(趙金超、CC BY-SA 4.0)

前提条件

ShardingSphere は、次の SQL ステートメントを実行して、基盤となる MySQL クラスター環境の準備ができているかどうかを確認します。いずれかのテストが失敗した場合、ShardingSphere を開始できません。

MGR がインストールされているかどうかを確認します。

SELECT * FROM information_schema.PLUGINS WHERE PLUGIN_NAME='group_replication'

MGR グループのメンバー番号を表示します。基礎となる MGR クラスターは、少なくとも 3 つのノードで構成されている必要があります。

SELECT count(*) FROM performance_schema.replication_group_members

MGR クラスターのグループ名が構成内のグループ名と一致しているかどうかを確認してください。グループ名は MGR グループのマーカーであり、MGR クラスターの各グループにはグループ名が 1 つだけあります。

SELECT * FROM performance_schema.global_variables WHERE VARIABLE_NAME='group_replication_group_name' 

現在の MGR がシングル プライマリ モードとして設定されているかどうかを確認します。現在、ShardingSphere は二重書き込みまたは複数書き込みのシナリオをサポートしていません。単一書き込みモードのみをサポートします。

SELECT * FROM performance_schema.global_variables WHERE VARIABLE_NAME='group_replication_single_primary_mode'

MGR グループ クラスター内のすべてのノード ホスト、ポート、および状態をクエリして、構成されたデータ ソースが正しいかどうかを確認します。

SELECT MEMBER_HOST, MEMBER_PORT, MEMBER_STATE FROM performance_schema.replication_group_members

動的プライマリデータベース検出

ShardingSphere は、MySQL が提供するクエリ マスター データベース SQL コマンドに従ってプライマリ データベース URL を検索します。

private String findPrimaryDataSourceURL(final Map<String, DataSource> dataSourceMap) {
    String result = "";
    String sql = "SELECT MEMBER_HOST, MEMBER_PORT FROM performance_schema.replication_group_members WHERE MEMBER_ID = "
            + "(SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'group_replication_primary_member')";
    for (DataSource each : dataSourceMap.values()) {
        try (Connection connection = each.getConnection();
             Statement statement = connection.createStatement();
             ResultSet resultSet = statement.executeQuery(sql)) {
            if (resultSet.next()) {
                return String.format("%s:%s", resultSet.getString("MEMBER_HOST"), resultSet.getString("MEMBER_PORT"));
            }
        } catch (final SQLException ex) {
            log.error("An exception occurred while find primary data source url", ex);
        }
    }
    return result;
}

上記で見つかったプライマリ データベースの URL を、構成された dataSources URL と 1 つずつ比較します。一致したデータ ソースがプライマリ データベースです。これは現在の ShardingSphere メモリに更新され、レジストリ センターに永続化され、そこを通じてクラスター内の他の計算ノードに配布されます。

(趙金超、CC BY-SA 4.0)

動的なセカンダリ データベースの検出

ShardingSphere には、有効と無効の 2 種類のセカンダリ データベースの状態があります。セカンダリ データベースの状態は ShardingSphere メモリと同期され、読み取りトラフィックが正しくルーティングされることが保証されます。

MGR グループ内のすべてのノードを取得します。

SELECT MEMBER_HOST, MEMBER_PORT, MEMBER_STATE FROM performance_schema.replication_group_members

セカンダリ データベースを無効にします。

private void determineDisabledDataSource(final String schemaName, final Map<String, DataSource> activeDataSourceMap,
                                         final List<String> memberDataSourceURLs, final Map<String, String> dataSourceURLs) {
    for (Entry<String, DataSource> entry : activeDataSourceMap.entrySet()) {
        boolean disable = true;
        String url = null;
        try (Connection connection = entry.getValue().getConnection()) {
            url = connection.getMetaData().getURL();
            for (String each : memberDataSourceURLs) {
                if (null != url && url.contains(each)) {
                    disable = false;
                    break;
                }
            }
        } catch (final SQLException ex) {
            log.error("An exception occurred while find data source urls", ex);
        }
        if (disable) {
            ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(schemaName, entry.getKey(), true));
        } else if (!url.isEmpty()) {
            dataSourceURLs.put(entry.getKey(), url);
        }
    }
}

セカンダリ データベースが無効かどうかは、構成されたデータ ソースと MGR グループ内のすべてのノードに基づきます。

ShardingSphere は、構成されているデータ ソースが Connection を適切に取得できるかどうかを 1 つずつチェックし、データ ソース URL に MGR グループのノードが含まれているかどうかを検証します。

Connection を取得できない場合、または検証が失敗した場合、ShardingSphere はイベント トリガーによってデータ ソースを無効にし、レジストリ センターと同期します。

セカンダリ データベースを有効にします。

private void determineEnabledDataSource(final Map<String, DataSource> dataSourceMap, final String schemaName,
                                        final List<String> memberDataSourceURLs, final Map<String, String> dataSourceURLs) {
    for (String each : memberDataSourceURLs) {
        boolean enable = true;
        for (Entry<String, String> entry : dataSourceURLs.entrySet()) {
            if (entry.getValue().contains(each)) {
                enable = false;
                break;
            }
        }
        if (!enable) {
            continue;
        }
        for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
            String url;
            try (Connection connection = entry.getValue().getConnection()) {
                url = connection.getMetaData().getURL();
                if (null != url && url.contains(each)) {
                    ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(schemaName, entry.getKey(), false));
                    break;
                }
            } catch (final SQLException ex) {
                log.error("An exception occurred while find enable data source urls", ex);
            }
        }
    }
}

クラッシュしたセカンダリ データベースが回復され、MGR グループに追加された後、構成がチェックされ、回復されたデータ ソースが使用されているかどうかが確認されます。 「はい」の場合、イベント トリガーは ShardingSphere にデータ ソースを有効にする必要があることを伝えます。

心拍の仕組み

ハートビート メカニズムが HA モジュールに導入され、プライマリとセカンダリの状態がリアルタイムで同期されるようになります。

ShardingSphere サブプロジェクト ElasticJob を統合することで、HA モジュールの初期化時に上記の処理が ElasticJob スケジューラー フレームワークによって Job として実行され、機能開発とジョブのスケジューリングの分離が実現されます。

開発者は HA 機能を拡張する必要がある場合でも、ジョブがどのように開発および操作されるかを気にする必要はありません。

private void initHeartBeatJobs(final String schemaName, final Map<String, DataSource> dataSourceMap) {
    Optional<ModeScheduleContext> modeScheduleContext = ModeScheduleContextFactory.getInstance().get();
    if (modeScheduleContext.isPresent()) {
        for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {
            Map<String, DataSource> dataSources = dataSourceMap.entrySet().stream().filter(dataSource -> !entry.getValue().getDisabledDataSourceNames().contains(dataSource.getKey()))
                    .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
            CronJob job = new CronJob(entry.getValue().getDatabaseDiscoveryType().getType() + "-" + entry.getValue().getGroupName(),
                each -> new HeartbeatJob(schemaName, dataSources, entry.getValue().getGroupName(), entry.getValue().getDatabaseDiscoveryType(), entry.getValue().getDisabledDataSourceNames())
                            .execute(null), entry.getValue().getHeartbeatProps().getProperty("keep-alive-cron"));
            modeScheduleContext.get().startCronJob(job);
        }
    }
}

まとめ

これまでのところ、Apache ShardingSphere の HA 機能は MySQL および openGauss HA ソリューションに適用できることが証明されています。今後は、より多くの MySQL HA 製品を統合し、より多くのデータベース HA ソリューションをサポートする予定です。

いつものように、ご興味がございましたら、ぜひご参加いただき、Apache ShardingSphere プロジェクトに貢献していただければ幸いです。

Apache ShardingSphere オープンソース プロジェクトのリンク:

  • ShardingSphere GitHub
  • シャーディングスフィア Twitter
  • ShardingSphere Slack
  • 寄稿者のガイドライン

関連記事: