001package com.streamconverter.command.rule;
002
003import com.streamconverter.StreamProcessingException;
004import java.sql.Connection;
005import java.sql.PreparedStatement;
006import java.sql.ResultSet;
007import java.sql.ResultSetMetaData;
008import java.sql.SQLException;
009import java.util.Objects;
010import org.slf4j.Logger;
011import org.slf4j.LoggerFactory;
012
013/**
014 * HikariCP対応のデータベースフェッチルール
015 *
016 * <p>DatabaseFetchRuleの高性能版です。HikariCPを使用して接続の再利用により パフォーマンスを大幅に向上させます。特に大量のデータ処理や高頻度のデータベースアクセスが
017 * 必要な場合に効果的です。
018 *
019 * <p>使用例:
020 *
021 * <pre>{@code
022 * // HikariCP接続プールを作成
023 * HikariConnectionPoolConfig pool = new HikariConnectionPoolConfig("jdbc:h2:mem:testdb", 10, Duration.ofSeconds(30));
024 *
025 * // プール対応ルールを作成
026 * PooledDatabaseFetchRule rule = new PooledDatabaseFetchRule(
027 *     pool,
028 *     "SELECT name FROM users WHERE id = ?"
029 * );
030 *
031 * // 大量処理でも高速
032 * for (int i = 0; i < 10000; i++) {
033 *     String result = rule.apply(String.valueOf(i));
034 * }
035 *
036 * // 使用後はプールをシャットダウン
037 * pool.close();
038 * }</pre>
039 *
040 * <p>DatabaseFetchRuleとの違い:
041 *
042 * <ul>
043 *   <li>業界標準HikariCP使用によりパフォーマンス大幅向上
044 *   <li>接続リーク検出と自動回復
045 *   <li>複数スレッドからの同時アクセス対応
046 *   <li>詳細なプールメトリクス
047 * </ul>
048 */
049public class PooledDatabaseFetchRule implements IRule {
050  private static final Logger logger = LoggerFactory.getLogger(PooledDatabaseFetchRule.class);
051
052  private final HikariConnectionPoolConfig connectionPool;
053  private final String query;
054
055  /**
056   * コンストラクタ
057   *
058   * @param connectionPool HikariCP接続プール
059   * @param query データベースクエリ(SELECTクエリのみ許可)
060   * @throws IllegalArgumentException 無効なパラメータが指定された場合
061   * @throws SecurityException セキュリティ違反が検出された場合
062   */
063  public PooledDatabaseFetchRule(HikariConnectionPoolConfig connectionPool, String query) {
064    Objects.requireNonNull(connectionPool, "Connection pool cannot be null");
065    Objects.requireNonNull(query, "Query cannot be null");
066
067    this.connectionPool = connectionPool;
068    this.query = validateQuery(query.trim());
069
070    logger.info(
071        "PooledDatabaseFetchRule initialized - Query length: {}, Pool: {}",
072        this.query.length(),
073        connectionPool.getDetailedStats());
074  }
075
076  /**
077   * クエリを検証します(SQLインジェクション対策)
078   *
079   * @param queryString 検証対象のクエリ
080   * @return 検証済みのクエリ
081   * @throws SecurityException SQLインジェクションが検出された場合
082   */
083  private String validateQuery(String queryString) {
084    return SqlQueryUtils.validateQuery(queryString, logger);
085  }
086
087  private String sanitizeInput(String input) {
088    return SqlQueryUtils.sanitizeInput(input, logger);
089  }
090
091  /**
092   * ルールの適用を実行します。
093   *
094   * <p>コネクションプールから接続を取得してクエリを実行し、結果の先頭行・先頭列の値を返します。 接続はプールに自動的に返却されるため、高いパフォーマンスを実現します。
095   *
096   * @param input 変換対象の文字列(クエリパラメータとして使用)
097   * @return クエリ結果の先頭値、または空文字列(結果がない場合)
098   * @throws StreamProcessingException SQLExceptionが発生した場合
099   * @throws IllegalArgumentException inputがnullの場合(sanitizeInput経由)
100   */
101  @Override
102  public String apply(String input) {
103    logger.debug("Getting connection from pool: {}", connectionPool.getPoolStats());
104    try (Connection connection = connectionPool.getConnection();
105        PreparedStatement statement = connection.prepareStatement(query)) {
106
107      // 入力文字列をパラメータとして設定(クエリに「?」プレースホルダーがある場合)
108      if (query.contains("?") && input != null && !input.isEmpty()) {
109        String sanitizedInput = sanitizeInput(input);
110        if (sanitizedInput.isEmpty()) {
111          logger.warn(
112              "Input parameter was sanitized to empty string. Rejecting input for security reasons. Original input: {}",
113              input);
114          return "";
115        }
116
117        statement.setString(1, sanitizedInput);
118        logger.debug("Parameter set for prepared statement: length={}", sanitizedInput.length());
119      }
120
121      // クエリ実行
122      logger.debug("Executing query with pooled connection: {}", query);
123      try (ResultSet resultSet = statement.executeQuery()) {
124        // 結果の検証と処理
125        ResultSetMetaData metaData = resultSet.getMetaData();
126        int columnCount = metaData.getColumnCount();
127
128        // 結果がない場合
129        if (!resultSet.next()) {
130          logger.warn("クエリ結果が空です。");
131          return "";
132        }
133
134        // 列数の検証
135        if (columnCount != 1) {
136          logger.warn("クエリ結果が一列ではありません。列数: {}。先頭列の値を使用します。", columnCount);
137        }
138
139        // 先頭行の先頭列の値を取得
140        String value = resultSet.getString(1);
141
142        // 追加の行があるかチェック
143        boolean hasMoreRows = resultSet.next();
144        if (hasMoreRows) {
145          logger.warn("クエリ結果が複数行あります。先頭行の値を使用します。");
146        }
147
148        // nullチェック
149        if (value == null) {
150          logger.info("クエリ結果の先頭値がNULLです。");
151          return "";
152        }
153
154        // 結果が理想的(1行1列)かどうかをログに記録
155        if (columnCount == 1 && !hasMoreRows) {
156          logger.debug("データベースから単一値を取得しました(プール使用): {}", value);
157        } else {
158          logger.debug("データベースから先頭値を取得しました(プール使用): {}", value);
159        }
160
161        return value;
162      }
163
164    } catch (SQLException e) {
165      logger.error("プール接続でのデータベース操作中にエラーが発生しました: {}", e.getMessage(), e);
166      Throwable[] suppressed = e.getSuppressed();
167      if (suppressed != null) {
168        for (Throwable s : suppressed) {
169          logger.error("クローズ中に追加のエラーが発生しました: {}", s.getMessage(), s);
170        }
171      }
172      throw new StreamProcessingException(
173          "データベースフェッチに失敗しました: " + e.getMessage(), e);
174    }
175  }
176
177  /**
178   * プールの統計情報を取得
179   *
180   * @return プールの統計情報
181   */
182  public String getPoolStats() {
183    return connectionPool.getPoolStats();
184  }
185}