001package com.streamconverter.command.rule;
002
003import java.sql.Connection;
004import java.sql.DriverManager;
005import java.sql.SQLException;
006import java.util.concurrent.ArrayBlockingQueue;
007import java.util.concurrent.BlockingQueue;
008import java.util.concurrent.TimeUnit;
009import java.util.concurrent.atomic.AtomicBoolean;
010import java.util.concurrent.atomic.AtomicInteger;
011import org.slf4j.Logger;
012import org.slf4j.LoggerFactory;
013
014/**
015 * データベース接続プールの実装
016 *
017 * <p>DatabaseFetchRuleで使用するための軽量なコネクションプール実装です。 接続の再利用によりパフォーマンスを向上させ、リソース使用量を最適化します。
018 *
019 * <p>機能:
020 *
021 * <ul>
022 *   <li>接続プールサイズの設定可能
023 *   <li>接続タイムアウトの設定
024 *   <li>自動的な接続検証と回復
025 *   <li>スレッドセーフな設計
026 *   <li>適切なリソース管理
027 * </ul>
028 */
029public class DatabaseConnectionPool {
030  private static final Logger logger = LoggerFactory.getLogger(DatabaseConnectionPool.class);
031
032  private final String databaseUrl;
033  private final int maxPoolSize;
034  private final long connectionTimeoutMs;
035  private final BlockingQueue<Connection> connectionPool;
036  private final AtomicInteger activeConnections = new AtomicInteger(0);
037  private final AtomicBoolean isShutdown = new AtomicBoolean(false);
038
039  /**
040   * デフォルト設定でコネクションプールを初期化
041   *
042   * @param databaseUrl データベースURL
043   */
044  public DatabaseConnectionPool(String databaseUrl) {
045    this(databaseUrl, 5, 30000); // デフォルト: 最大5接続、30秒タイムアウト
046  }
047
048  /**
049   * 設定可能なコネクションプールを初期化
050   *
051   * @param databaseUrl データベースURL
052   * @param maxPoolSize 最大プールサイズ
053   * @param connectionTimeoutMs 接続タイムアウト(ミリ秒)
054   */
055  public DatabaseConnectionPool(String databaseUrl, int maxPoolSize, long connectionTimeoutMs) {
056    this.databaseUrl = databaseUrl;
057    this.maxPoolSize = maxPoolSize;
058    this.connectionTimeoutMs = connectionTimeoutMs;
059    this.connectionPool = new ArrayBlockingQueue<>(maxPoolSize);
060
061    logger.info(
062        "DatabaseConnectionPool initialized - URL: {}, MaxPoolSize: {}, TimeoutMs: {}",
063        databaseUrl,
064        maxPoolSize,
065        connectionTimeoutMs);
066  }
067
068  /**
069   * プールから接続を取得
070   *
071   * @return データベース接続
072   * @throws SQLException 接続取得に失敗した場合
073   */
074  public Connection getConnection() throws SQLException {
075    if (isShutdown.get()) {
076      throw new SQLException("Connection pool is shutdown");
077    }
078
079    // プールから既存の接続を試行
080    Connection connection = connectionPool.poll();
081    if (connection != null && isConnectionValid(connection)) {
082      logger.debug("Reused connection from pool");
083      return new PooledConnection(connection, this);
084    }
085
086    // 既存接続が無効な場合はクローズ
087    if (connection != null) {
088      try {
089        connection.close();
090      } catch (SQLException e) {
091        logger.warn("Failed to close invalid connection: {}", e.getMessage());
092      }
093    }
094
095    // 新しい接続を作成(プールサイズ制限内)
096    if (activeConnections.get() < maxPoolSize) {
097      try {
098        connection = DriverManager.getConnection(databaseUrl);
099        activeConnections.incrementAndGet();
100        logger.debug("Created new connection - Active connections: {}", activeConnections.get());
101        return new PooledConnection(connection, this);
102      } catch (SQLException e) {
103        logger.error("Failed to create new connection: {}", e.getMessage());
104        throw e;
105      }
106    }
107
108    // プールが満杯の場合は待機
109    try {
110      connection = connectionPool.poll(connectionTimeoutMs, TimeUnit.MILLISECONDS);
111      if (connection != null) {
112        if (isConnectionValid(connection)) {
113          logger.debug("Got connection after waiting");
114          return new PooledConnection(connection, this);
115        } else {
116          // 無効な接続をクローズして再試行
117          try {
118            connection.close();
119          } catch (SQLException e) {
120            logger.warn("Failed to close invalid waited connection: {}", e.getMessage());
121          }
122          activeConnections.decrementAndGet();
123        }
124      }
125    } catch (InterruptedException e) {
126      Thread.currentThread().interrupt();
127      throw new SQLException("Interrupted while waiting for connection", e);
128    }
129
130    throw new SQLException("Failed to get connection within timeout");
131  }
132
133  /**
134   * 接続をプールに返却
135   *
136   * @param connection 返却する接続
137   */
138  void returnConnection(Connection connection) {
139    if (isShutdown.get() || !isConnectionValid(connection)) {
140      try {
141        connection.close();
142        activeConnections.decrementAndGet();
143      } catch (SQLException e) {
144        logger.warn("Failed to close returned connection: {}", e.getMessage());
145      }
146      return;
147    }
148
149    if (!connectionPool.offer(connection)) {
150      // プールが満杯の場合は接続をクローズ
151      try {
152        connection.close();
153        activeConnections.decrementAndGet();
154        logger.debug("Closed excess connection - Active connections: {}", activeConnections.get());
155      } catch (SQLException e) {
156        logger.warn("Failed to close excess connection: {}", e.getMessage());
157      }
158    } else {
159      logger.debug("Returned connection to pool");
160    }
161  }
162
163  /**
164   * 接続の有効性を検証
165   *
166   * @param connection 検証する接続
167   * @return 有効な場合true
168   */
169  private boolean isConnectionValid(Connection connection) {
170    try {
171      return connection != null && !connection.isClosed() && connection.isValid(1);
172    } catch (SQLException e) {
173      return false;
174    }
175  }
176
177  /**
178   * プールの現在の統計情報を取得
179   *
180   * @return 統計情報文字列
181   */
182  public String getPoolStats() {
183    return String.format(
184        "ConnectionPool[active=%d, pooled=%d, max=%d]",
185        activeConnections.get(), connectionPool.size(), maxPoolSize);
186  }
187
188  /** プールをシャットダウンし、全ての接続をクローズ */
189  public void shutdown() {
190    isShutdown.set(true);
191    logger.info("Shutting down connection pool");
192
193    // プール内の全接続をクローズ
194    Connection connection;
195    while ((connection = connectionPool.poll()) != null) {
196      try {
197        connection.close();
198      } catch (SQLException e) {
199        logger.warn("Failed to close connection during shutdown: {}", e.getMessage());
200      }
201    }
202
203    logger.info("Connection pool shutdown completed - Final stats: {}", getPoolStats());
204  }
205
206  /** プール管理されたConnection実装 */
207  private static class PooledConnection implements Connection {
208    private final Connection delegate;
209    private final DatabaseConnectionPool pool;
210    private boolean closed = false;
211
212    PooledConnection(Connection delegate, DatabaseConnectionPool pool) {
213      this.delegate = delegate;
214      this.pool = pool;
215    }
216
217    @Override
218    public void close() throws SQLException {
219      if (!closed) {
220        closed = true;
221        pool.returnConnection(delegate);
222      }
223    }
224
225    // Connection インターフェース のすべてのメソッドを delegate に委譲
226    @Override
227    public java.sql.Statement createStatement() throws SQLException {
228      return delegate.createStatement();
229    }
230
231    @Override
232    public java.sql.PreparedStatement prepareStatement(String sql) throws SQLException {
233      return delegate.prepareStatement(sql);
234    }
235
236    @Override
237    public java.sql.CallableStatement prepareCall(String sql) throws SQLException {
238      return delegate.prepareCall(sql);
239    }
240
241    @Override
242    public String nativeSQL(String sql) throws SQLException {
243      return delegate.nativeSQL(sql);
244    }
245
246    @Override
247    public void setAutoCommit(boolean autoCommit) throws SQLException {
248      delegate.setAutoCommit(autoCommit);
249    }
250
251    @Override
252    public boolean getAutoCommit() throws SQLException {
253      return delegate.getAutoCommit();
254    }
255
256    @Override
257    public void commit() throws SQLException {
258      delegate.commit();
259    }
260
261    @Override
262    public void rollback() throws SQLException {
263      delegate.rollback();
264    }
265
266    @Override
267    public boolean isClosed() throws SQLException {
268      return closed || delegate.isClosed();
269    }
270
271    @Override
272    public java.sql.DatabaseMetaData getMetaData() throws SQLException {
273      return delegate.getMetaData();
274    }
275
276    @Override
277    public void setReadOnly(boolean readOnly) throws SQLException {
278      delegate.setReadOnly(readOnly);
279    }
280
281    @Override
282    public boolean isReadOnly() throws SQLException {
283      return delegate.isReadOnly();
284    }
285
286    @Override
287    public void setCatalog(String catalog) throws SQLException {
288      delegate.setCatalog(catalog);
289    }
290
291    @Override
292    public String getCatalog() throws SQLException {
293      return delegate.getCatalog();
294    }
295
296    @Override
297    public void setTransactionIsolation(int level) throws SQLException {
298      delegate.setTransactionIsolation(level);
299    }
300
301    @Override
302    public int getTransactionIsolation() throws SQLException {
303      return delegate.getTransactionIsolation();
304    }
305
306    @Override
307    public java.sql.SQLWarning getWarnings() throws SQLException {
308      return delegate.getWarnings();
309    }
310
311    @Override
312    public void clearWarnings() throws SQLException {
313      delegate.clearWarnings();
314    }
315
316    @Override
317    public java.sql.Statement createStatement(int resultSetType, int resultSetConcurrency)
318        throws SQLException {
319      return delegate.createStatement(resultSetType, resultSetConcurrency);
320    }
321
322    @Override
323    public java.sql.PreparedStatement prepareStatement(
324        String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
325      return delegate.prepareStatement(sql, resultSetType, resultSetConcurrency);
326    }
327
328    @Override
329    public java.sql.CallableStatement prepareCall(
330        String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
331      return delegate.prepareCall(sql, resultSetType, resultSetConcurrency);
332    }
333
334    @Override
335    public java.util.Map<String, Class<?>> getTypeMap() throws SQLException {
336      return delegate.getTypeMap();
337    }
338
339    @Override
340    public void setTypeMap(java.util.Map<String, Class<?>> map) throws SQLException {
341      delegate.setTypeMap(map);
342    }
343
344    @Override
345    public void setHoldability(int holdability) throws SQLException {
346      delegate.setHoldability(holdability);
347    }
348
349    @Override
350    public int getHoldability() throws SQLException {
351      return delegate.getHoldability();
352    }
353
354    @Override
355    public java.sql.Savepoint setSavepoint() throws SQLException {
356      return delegate.setSavepoint();
357    }
358
359    @Override
360    public java.sql.Savepoint setSavepoint(String name) throws SQLException {
361      return delegate.setSavepoint(name);
362    }
363
364    @Override
365    public void rollback(java.sql.Savepoint savepoint) throws SQLException {
366      delegate.rollback(savepoint);
367    }
368
369    @Override
370    public void releaseSavepoint(java.sql.Savepoint savepoint) throws SQLException {
371      delegate.releaseSavepoint(savepoint);
372    }
373
374    @Override
375    public java.sql.Statement createStatement(
376        int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
377      return delegate.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
378    }
379
380    @Override
381    public java.sql.PreparedStatement prepareStatement(
382        String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability)
383        throws SQLException {
384      return delegate.prepareStatement(
385          sql, resultSetType, resultSetConcurrency, resultSetHoldability);
386    }
387
388    @Override
389    public java.sql.CallableStatement prepareCall(
390        String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability)
391        throws SQLException {
392      return delegate.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
393    }
394
395    @Override
396    public java.sql.PreparedStatement prepareStatement(String sql, int autoGeneratedKeys)
397        throws SQLException {
398      return delegate.prepareStatement(sql, autoGeneratedKeys);
399    }
400
401    @Override
402    public java.sql.PreparedStatement prepareStatement(String sql, int[] columnIndexes)
403        throws SQLException {
404      return delegate.prepareStatement(sql, columnIndexes);
405    }
406
407    @Override
408    public java.sql.PreparedStatement prepareStatement(String sql, String[] columnNames)
409        throws SQLException {
410      return delegate.prepareStatement(sql, columnNames);
411    }
412
413    @Override
414    public java.sql.Clob createClob() throws SQLException {
415      return delegate.createClob();
416    }
417
418    @Override
419    public java.sql.Blob createBlob() throws SQLException {
420      return delegate.createBlob();
421    }
422
423    @Override
424    public java.sql.NClob createNClob() throws SQLException {
425      return delegate.createNClob();
426    }
427
428    @Override
429    public java.sql.SQLXML createSQLXML() throws SQLException {
430      return delegate.createSQLXML();
431    }
432
433    @Override
434    public boolean isValid(int timeout) throws SQLException {
435      return delegate.isValid(timeout);
436    }
437
438    @Override
439    public void setClientInfo(String name, String value) throws java.sql.SQLClientInfoException {
440      delegate.setClientInfo(name, value);
441    }
442
443    @Override
444    public void setClientInfo(java.util.Properties properties)
445        throws java.sql.SQLClientInfoException {
446      delegate.setClientInfo(properties);
447    }
448
449    @Override
450    public String getClientInfo(String name) throws SQLException {
451      return delegate.getClientInfo(name);
452    }
453
454    @Override
455    public java.util.Properties getClientInfo() throws SQLException {
456      return delegate.getClientInfo();
457    }
458
459    @Override
460    public java.sql.Array createArrayOf(String typeName, Object[] elements) throws SQLException {
461      return delegate.createArrayOf(typeName, elements);
462    }
463
464    @Override
465    public java.sql.Struct createStruct(String typeName, Object[] attributes) throws SQLException {
466      return delegate.createStruct(typeName, attributes);
467    }
468
469    @Override
470    public void setSchema(String schema) throws SQLException {
471      delegate.setSchema(schema);
472    }
473
474    @Override
475    public String getSchema() throws SQLException {
476      return delegate.getSchema();
477    }
478
479    @Override
480    public void abort(java.util.concurrent.Executor executor) throws SQLException {
481      delegate.abort(executor);
482    }
483
484    @Override
485    public void setNetworkTimeout(java.util.concurrent.Executor executor, int milliseconds)
486        throws SQLException {
487      delegate.setNetworkTimeout(executor, milliseconds);
488    }
489
490    @Override
491    public int getNetworkTimeout() throws SQLException {
492      return delegate.getNetworkTimeout();
493    }
494
495    @Override
496    public <T> T unwrap(Class<T> iface) throws SQLException {
497      return delegate.unwrap(iface);
498    }
499
500    @Override
501    public boolean isWrapperFor(Class<?> iface) throws SQLException {
502      return delegate.isWrapperFor(iface);
503    }
504  }
505}