001package com.streamconverter.command.rule; 002 003import com.streamconverter.StreamProcessingException; 004import java.sql.Connection; 005import java.sql.PreparedStatement; 006import java.sql.ResultSet; 007import java.sql.ResultSetMetaData; 008import java.sql.SQLException; 009import java.util.Objects; 010import org.slf4j.Logger; 011import org.slf4j.LoggerFactory; 012 013/** 014 * HikariCP対応のデータベースフェッチルール 015 * 016 * <p>DatabaseFetchRuleの高性能版です。HikariCPを使用して接続の再利用により パフォーマンスを大幅に向上させます。特に大量のデータ処理や高頻度のデータベースアクセスが 017 * 必要な場合に効果的です。 018 * 019 * <p>使用例: 020 * 021 * <pre>{@code 022 * // HikariCP接続プールを作成 023 * HikariConnectionPoolConfig pool = new HikariConnectionPoolConfig("jdbc:h2:mem:testdb", 10, Duration.ofSeconds(30)); 024 * 025 * // プール対応ルールを作成 026 * PooledDatabaseFetchRule rule = new PooledDatabaseFetchRule( 027 * pool, 028 * "SELECT name FROM users WHERE id = ?" 029 * ); 030 * 031 * // 大量処理でも高速 032 * for (int i = 0; i < 10000; i++) { 033 * String result = rule.apply(String.valueOf(i)); 034 * } 035 * 036 * // 使用後はプールをシャットダウン 037 * pool.close(); 038 * }</pre> 039 * 040 * <p>DatabaseFetchRuleとの違い: 041 * 042 * <ul> 043 * <li>業界標準HikariCP使用によりパフォーマンス大幅向上 044 * <li>接続リーク検出と自動回復 045 * <li>複数スレッドからの同時アクセス対応 046 * <li>詳細なプールメトリクス 047 * </ul> 048 */ 049public class PooledDatabaseFetchRule implements IRule { 050 private static final Logger logger = LoggerFactory.getLogger(PooledDatabaseFetchRule.class); 051 052 private final HikariConnectionPoolConfig connectionPool; 053 private final String query; 054 055 /** 056 * コンストラクタ 057 * 058 * @param connectionPool HikariCP接続プール 059 * @param query データベースクエリ(SELECTクエリのみ許可) 060 * @throws IllegalArgumentException 無効なパラメータが指定された場合 061 * @throws SecurityException セキュリティ違反が検出された場合 062 */ 063 public PooledDatabaseFetchRule(HikariConnectionPoolConfig connectionPool, String query) { 064 Objects.requireNonNull(connectionPool, "Connection pool cannot be null"); 065 Objects.requireNonNull(query, "Query cannot be null"); 066 067 this.connectionPool = connectionPool; 068 this.query = validateQuery(query.trim()); 069 070 logger.info( 071 "PooledDatabaseFetchRule initialized - Query length: {}, Pool: {}", 072 this.query.length(), 073 connectionPool.getDetailedStats()); 074 } 075 076 /** 077 * クエリを検証します(SQLインジェクション対策) 078 * 079 * @param queryString 検証対象のクエリ 080 * @return 検証済みのクエリ 081 * @throws SecurityException SQLインジェクションが検出された場合 082 */ 083 private String validateQuery(String queryString) { 084 return SqlQueryUtils.validateQuery(queryString, logger); 085 } 086 087 private String sanitizeInput(String input) { 088 return SqlQueryUtils.sanitizeInput(input, logger); 089 } 090 091 /** 092 * ルールの適用を実行します。 093 * 094 * <p>コネクションプールから接続を取得してクエリを実行し、結果の先頭行・先頭列の値を返します。 接続はプールに自動的に返却されるため、高いパフォーマンスを実現します。 095 * 096 * @param input 変換対象の文字列(クエリパラメータとして使用) 097 * @return クエリ結果の先頭値、または空文字列(結果がない場合) 098 * @throws StreamProcessingException SQLExceptionが発生した場合 099 * @throws IllegalArgumentException inputがnullの場合(sanitizeInput経由) 100 */ 101 @Override 102 public String apply(String input) { 103 logger.debug("Getting connection from pool: {}", connectionPool.getPoolStats()); 104 try (Connection connection = connectionPool.getConnection(); 105 PreparedStatement statement = connection.prepareStatement(query)) { 106 107 // 入力文字列をパラメータとして設定(クエリに「?」プレースホルダーがある場合) 108 if (query.contains("?") && input != null && !input.isEmpty()) { 109 String sanitizedInput = sanitizeInput(input); 110 if (sanitizedInput.isEmpty()) { 111 logger.warn( 112 "Input parameter was sanitized to empty string. Rejecting input for security reasons. Original input: {}", 113 input); 114 return ""; 115 } 116 117 statement.setString(1, sanitizedInput); 118 logger.debug("Parameter set for prepared statement: length={}", sanitizedInput.length()); 119 } 120 121 // クエリ実行 122 logger.debug("Executing query with pooled connection: {}", query); 123 try (ResultSet resultSet = statement.executeQuery()) { 124 // 結果の検証と処理 125 ResultSetMetaData metaData = resultSet.getMetaData(); 126 int columnCount = metaData.getColumnCount(); 127 128 // 結果がない場合 129 if (!resultSet.next()) { 130 logger.warn("クエリ結果が空です。"); 131 return ""; 132 } 133 134 // 列数の検証 135 if (columnCount != 1) { 136 logger.warn("クエリ結果が一列ではありません。列数: {}。先頭列の値を使用します。", columnCount); 137 } 138 139 // 先頭行の先頭列の値を取得 140 String value = resultSet.getString(1); 141 142 // 追加の行があるかチェック 143 boolean hasMoreRows = resultSet.next(); 144 if (hasMoreRows) { 145 logger.warn("クエリ結果が複数行あります。先頭行の値を使用します。"); 146 } 147 148 // nullチェック 149 if (value == null) { 150 logger.info("クエリ結果の先頭値がNULLです。"); 151 return ""; 152 } 153 154 // 結果が理想的(1行1列)かどうかをログに記録 155 if (columnCount == 1 && !hasMoreRows) { 156 logger.debug("データベースから単一値を取得しました(プール使用): {}", value); 157 } else { 158 logger.debug("データベースから先頭値を取得しました(プール使用): {}", value); 159 } 160 161 return value; 162 } 163 164 } catch (SQLException e) { 165 logger.error("プール接続でのデータベース操作中にエラーが発生しました: {}", e.getMessage(), e); 166 Throwable[] suppressed = e.getSuppressed(); 167 if (suppressed != null) { 168 for (Throwable s : suppressed) { 169 logger.error("クローズ中に追加のエラーが発生しました: {}", s.getMessage(), s); 170 } 171 } 172 throw new StreamProcessingException( 173 "データベースフェッチに失敗しました: " + e.getMessage(), e); 174 } 175 } 176 177 /** 178 * プールの統計情報を取得 179 * 180 * @return プールの統計情報 181 */ 182 public String getPoolStats() { 183 return connectionPool.getPoolStats(); 184 } 185}