001package com.streamconverter.command.rule; 002 003import java.sql.Connection; 004import java.sql.DriverManager; 005import java.sql.SQLException; 006import java.util.concurrent.ArrayBlockingQueue; 007import java.util.concurrent.BlockingQueue; 008import java.util.concurrent.TimeUnit; 009import java.util.concurrent.atomic.AtomicBoolean; 010import java.util.concurrent.atomic.AtomicInteger; 011import org.slf4j.Logger; 012import org.slf4j.LoggerFactory; 013 014/** 015 * データベース接続プールの実装 016 * 017 * <p>DatabaseFetchRuleで使用するための軽量なコネクションプール実装です。 接続の再利用によりパフォーマンスを向上させ、リソース使用量を最適化します。 018 * 019 * <p>機能: 020 * 021 * <ul> 022 * <li>接続プールサイズの設定可能 023 * <li>接続タイムアウトの設定 024 * <li>自動的な接続検証と回復 025 * <li>スレッドセーフな設計 026 * <li>適切なリソース管理 027 * </ul> 028 */ 029public class DatabaseConnectionPool { 030 private static final Logger logger = LoggerFactory.getLogger(DatabaseConnectionPool.class); 031 032 private final String databaseUrl; 033 private final int maxPoolSize; 034 private final long connectionTimeoutMs; 035 private final BlockingQueue<Connection> connectionPool; 036 private final AtomicInteger activeConnections = new AtomicInteger(0); 037 private final AtomicBoolean isShutdown = new AtomicBoolean(false); 038 039 /** 040 * デフォルト設定でコネクションプールを初期化 041 * 042 * @param databaseUrl データベースURL 043 */ 044 public DatabaseConnectionPool(String databaseUrl) { 045 this(databaseUrl, 5, 30000); // デフォルト: 最大5接続、30秒タイムアウト 046 } 047 048 /** 049 * 設定可能なコネクションプールを初期化 050 * 051 * @param databaseUrl データベースURL 052 * @param maxPoolSize 最大プールサイズ 053 * @param connectionTimeoutMs 接続タイムアウト(ミリ秒) 054 */ 055 public DatabaseConnectionPool(String databaseUrl, int maxPoolSize, long connectionTimeoutMs) { 056 this.databaseUrl = databaseUrl; 057 this.maxPoolSize = maxPoolSize; 058 this.connectionTimeoutMs = connectionTimeoutMs; 059 this.connectionPool = new ArrayBlockingQueue<>(maxPoolSize); 060 061 logger.info( 062 "DatabaseConnectionPool initialized - URL: {}, MaxPoolSize: {}, TimeoutMs: {}", 063 databaseUrl, 064 maxPoolSize, 065 connectionTimeoutMs); 066 } 067 068 /** 069 * プールから接続を取得 070 * 071 * @return データベース接続 072 * @throws SQLException 接続取得に失敗した場合 073 */ 074 public Connection getConnection() throws SQLException { 075 if (isShutdown.get()) { 076 throw new SQLException("Connection pool is shutdown"); 077 } 078 079 // プールから既存の接続を試行 080 Connection connection = connectionPool.poll(); 081 if (connection != null && isConnectionValid(connection)) { 082 logger.debug("Reused connection from pool"); 083 return new PooledConnection(connection, this); 084 } 085 086 // 既存接続が無効な場合はクローズ 087 if (connection != null) { 088 try { 089 connection.close(); 090 } catch (SQLException e) { 091 logger.warn("Failed to close invalid connection: {}", e.getMessage()); 092 } 093 } 094 095 // 新しい接続を作成(プールサイズ制限内) 096 if (activeConnections.get() < maxPoolSize) { 097 try { 098 connection = DriverManager.getConnection(databaseUrl); 099 activeConnections.incrementAndGet(); 100 logger.debug("Created new connection - Active connections: {}", activeConnections.get()); 101 return new PooledConnection(connection, this); 102 } catch (SQLException e) { 103 logger.error("Failed to create new connection: {}", e.getMessage()); 104 throw e; 105 } 106 } 107 108 // プールが満杯の場合は待機 109 try { 110 connection = connectionPool.poll(connectionTimeoutMs, TimeUnit.MILLISECONDS); 111 if (connection != null) { 112 if (isConnectionValid(connection)) { 113 logger.debug("Got connection after waiting"); 114 return new PooledConnection(connection, this); 115 } else { 116 // 無効な接続をクローズして再試行 117 try { 118 connection.close(); 119 } catch (SQLException e) { 120 logger.warn("Failed to close invalid waited connection: {}", e.getMessage()); 121 } 122 activeConnections.decrementAndGet(); 123 } 124 } 125 } catch (InterruptedException e) { 126 Thread.currentThread().interrupt(); 127 throw new SQLException("Interrupted while waiting for connection", e); 128 } 129 130 throw new SQLException("Failed to get connection within timeout"); 131 } 132 133 /** 134 * 接続をプールに返却 135 * 136 * @param connection 返却する接続 137 */ 138 void returnConnection(Connection connection) { 139 if (isShutdown.get() || !isConnectionValid(connection)) { 140 try { 141 connection.close(); 142 activeConnections.decrementAndGet(); 143 } catch (SQLException e) { 144 logger.warn("Failed to close returned connection: {}", e.getMessage()); 145 } 146 return; 147 } 148 149 if (!connectionPool.offer(connection)) { 150 // プールが満杯の場合は接続をクローズ 151 try { 152 connection.close(); 153 activeConnections.decrementAndGet(); 154 logger.debug("Closed excess connection - Active connections: {}", activeConnections.get()); 155 } catch (SQLException e) { 156 logger.warn("Failed to close excess connection: {}", e.getMessage()); 157 } 158 } else { 159 logger.debug("Returned connection to pool"); 160 } 161 } 162 163 /** 164 * 接続の有効性を検証 165 * 166 * @param connection 検証する接続 167 * @return 有効な場合true 168 */ 169 private boolean isConnectionValid(Connection connection) { 170 try { 171 return connection != null && !connection.isClosed() && connection.isValid(1); 172 } catch (SQLException e) { 173 return false; 174 } 175 } 176 177 /** 178 * プールの現在の統計情報を取得 179 * 180 * @return 統計情報文字列 181 */ 182 public String getPoolStats() { 183 return String.format( 184 "ConnectionPool[active=%d, pooled=%d, max=%d]", 185 activeConnections.get(), connectionPool.size(), maxPoolSize); 186 } 187 188 /** プールをシャットダウンし、全ての接続をクローズ */ 189 public void shutdown() { 190 isShutdown.set(true); 191 logger.info("Shutting down connection pool"); 192 193 // プール内の全接続をクローズ 194 Connection connection; 195 while ((connection = connectionPool.poll()) != null) { 196 try { 197 connection.close(); 198 } catch (SQLException e) { 199 logger.warn("Failed to close connection during shutdown: {}", e.getMessage()); 200 } 201 } 202 203 logger.info("Connection pool shutdown completed - Final stats: {}", getPoolStats()); 204 } 205 206 /** プール管理されたConnection実装 */ 207 private static class PooledConnection implements Connection { 208 private final Connection delegate; 209 private final DatabaseConnectionPool pool; 210 private boolean closed = false; 211 212 PooledConnection(Connection delegate, DatabaseConnectionPool pool) { 213 this.delegate = delegate; 214 this.pool = pool; 215 } 216 217 @Override 218 public void close() throws SQLException { 219 if (!closed) { 220 closed = true; 221 pool.returnConnection(delegate); 222 } 223 } 224 225 // Connection インターフェース のすべてのメソッドを delegate に委譲 226 @Override 227 public java.sql.Statement createStatement() throws SQLException { 228 return delegate.createStatement(); 229 } 230 231 @Override 232 public java.sql.PreparedStatement prepareStatement(String sql) throws SQLException { 233 return delegate.prepareStatement(sql); 234 } 235 236 @Override 237 public java.sql.CallableStatement prepareCall(String sql) throws SQLException { 238 return delegate.prepareCall(sql); 239 } 240 241 @Override 242 public String nativeSQL(String sql) throws SQLException { 243 return delegate.nativeSQL(sql); 244 } 245 246 @Override 247 public void setAutoCommit(boolean autoCommit) throws SQLException { 248 delegate.setAutoCommit(autoCommit); 249 } 250 251 @Override 252 public boolean getAutoCommit() throws SQLException { 253 return delegate.getAutoCommit(); 254 } 255 256 @Override 257 public void commit() throws SQLException { 258 delegate.commit(); 259 } 260 261 @Override 262 public void rollback() throws SQLException { 263 delegate.rollback(); 264 } 265 266 @Override 267 public boolean isClosed() throws SQLException { 268 return closed || delegate.isClosed(); 269 } 270 271 @Override 272 public java.sql.DatabaseMetaData getMetaData() throws SQLException { 273 return delegate.getMetaData(); 274 } 275 276 @Override 277 public void setReadOnly(boolean readOnly) throws SQLException { 278 delegate.setReadOnly(readOnly); 279 } 280 281 @Override 282 public boolean isReadOnly() throws SQLException { 283 return delegate.isReadOnly(); 284 } 285 286 @Override 287 public void setCatalog(String catalog) throws SQLException { 288 delegate.setCatalog(catalog); 289 } 290 291 @Override 292 public String getCatalog() throws SQLException { 293 return delegate.getCatalog(); 294 } 295 296 @Override 297 public void setTransactionIsolation(int level) throws SQLException { 298 delegate.setTransactionIsolation(level); 299 } 300 301 @Override 302 public int getTransactionIsolation() throws SQLException { 303 return delegate.getTransactionIsolation(); 304 } 305 306 @Override 307 public java.sql.SQLWarning getWarnings() throws SQLException { 308 return delegate.getWarnings(); 309 } 310 311 @Override 312 public void clearWarnings() throws SQLException { 313 delegate.clearWarnings(); 314 } 315 316 @Override 317 public java.sql.Statement createStatement(int resultSetType, int resultSetConcurrency) 318 throws SQLException { 319 return delegate.createStatement(resultSetType, resultSetConcurrency); 320 } 321 322 @Override 323 public java.sql.PreparedStatement prepareStatement( 324 String sql, int resultSetType, int resultSetConcurrency) throws SQLException { 325 return delegate.prepareStatement(sql, resultSetType, resultSetConcurrency); 326 } 327 328 @Override 329 public java.sql.CallableStatement prepareCall( 330 String sql, int resultSetType, int resultSetConcurrency) throws SQLException { 331 return delegate.prepareCall(sql, resultSetType, resultSetConcurrency); 332 } 333 334 @Override 335 public java.util.Map<String, Class<?>> getTypeMap() throws SQLException { 336 return delegate.getTypeMap(); 337 } 338 339 @Override 340 public void setTypeMap(java.util.Map<String, Class<?>> map) throws SQLException { 341 delegate.setTypeMap(map); 342 } 343 344 @Override 345 public void setHoldability(int holdability) throws SQLException { 346 delegate.setHoldability(holdability); 347 } 348 349 @Override 350 public int getHoldability() throws SQLException { 351 return delegate.getHoldability(); 352 } 353 354 @Override 355 public java.sql.Savepoint setSavepoint() throws SQLException { 356 return delegate.setSavepoint(); 357 } 358 359 @Override 360 public java.sql.Savepoint setSavepoint(String name) throws SQLException { 361 return delegate.setSavepoint(name); 362 } 363 364 @Override 365 public void rollback(java.sql.Savepoint savepoint) throws SQLException { 366 delegate.rollback(savepoint); 367 } 368 369 @Override 370 public void releaseSavepoint(java.sql.Savepoint savepoint) throws SQLException { 371 delegate.releaseSavepoint(savepoint); 372 } 373 374 @Override 375 public java.sql.Statement createStatement( 376 int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { 377 return delegate.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); 378 } 379 380 @Override 381 public java.sql.PreparedStatement prepareStatement( 382 String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) 383 throws SQLException { 384 return delegate.prepareStatement( 385 sql, resultSetType, resultSetConcurrency, resultSetHoldability); 386 } 387 388 @Override 389 public java.sql.CallableStatement prepareCall( 390 String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) 391 throws SQLException { 392 return delegate.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability); 393 } 394 395 @Override 396 public java.sql.PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) 397 throws SQLException { 398 return delegate.prepareStatement(sql, autoGeneratedKeys); 399 } 400 401 @Override 402 public java.sql.PreparedStatement prepareStatement(String sql, int[] columnIndexes) 403 throws SQLException { 404 return delegate.prepareStatement(sql, columnIndexes); 405 } 406 407 @Override 408 public java.sql.PreparedStatement prepareStatement(String sql, String[] columnNames) 409 throws SQLException { 410 return delegate.prepareStatement(sql, columnNames); 411 } 412 413 @Override 414 public java.sql.Clob createClob() throws SQLException { 415 return delegate.createClob(); 416 } 417 418 @Override 419 public java.sql.Blob createBlob() throws SQLException { 420 return delegate.createBlob(); 421 } 422 423 @Override 424 public java.sql.NClob createNClob() throws SQLException { 425 return delegate.createNClob(); 426 } 427 428 @Override 429 public java.sql.SQLXML createSQLXML() throws SQLException { 430 return delegate.createSQLXML(); 431 } 432 433 @Override 434 public boolean isValid(int timeout) throws SQLException { 435 return delegate.isValid(timeout); 436 } 437 438 @Override 439 public void setClientInfo(String name, String value) throws java.sql.SQLClientInfoException { 440 delegate.setClientInfo(name, value); 441 } 442 443 @Override 444 public void setClientInfo(java.util.Properties properties) 445 throws java.sql.SQLClientInfoException { 446 delegate.setClientInfo(properties); 447 } 448 449 @Override 450 public String getClientInfo(String name) throws SQLException { 451 return delegate.getClientInfo(name); 452 } 453 454 @Override 455 public java.util.Properties getClientInfo() throws SQLException { 456 return delegate.getClientInfo(); 457 } 458 459 @Override 460 public java.sql.Array createArrayOf(String typeName, Object[] elements) throws SQLException { 461 return delegate.createArrayOf(typeName, elements); 462 } 463 464 @Override 465 public java.sql.Struct createStruct(String typeName, Object[] attributes) throws SQLException { 466 return delegate.createStruct(typeName, attributes); 467 } 468 469 @Override 470 public void setSchema(String schema) throws SQLException { 471 delegate.setSchema(schema); 472 } 473 474 @Override 475 public String getSchema() throws SQLException { 476 return delegate.getSchema(); 477 } 478 479 @Override 480 public void abort(java.util.concurrent.Executor executor) throws SQLException { 481 delegate.abort(executor); 482 } 483 484 @Override 485 public void setNetworkTimeout(java.util.concurrent.Executor executor, int milliseconds) 486 throws SQLException { 487 delegate.setNetworkTimeout(executor, milliseconds); 488 } 489 490 @Override 491 public int getNetworkTimeout() throws SQLException { 492 return delegate.getNetworkTimeout(); 493 } 494 495 @Override 496 public <T> T unwrap(Class<T> iface) throws SQLException { 497 return delegate.unwrap(iface); 498 } 499 500 @Override 501 public boolean isWrapperFor(Class<?> iface) throws SQLException { 502 return delegate.isWrapperFor(iface); 503 } 504 } 505}