001package com.streamconverter; 002 003import com.streamconverter.command.IStreamCommand; 004import com.streamconverter.context.ExecutionContext; 005import java.io.IOException; 006import java.io.InputStream; 007import java.io.OutputStream; 008import java.io.PipedInputStream; 009import java.io.PipedOutputStream; 010import java.util.ArrayList; 011import java.util.List; 012import java.util.Objects; 013import java.util.concurrent.CompletableFuture; 014import java.util.concurrent.ExecutionException; 015import java.util.concurrent.ExecutorService; 016import java.util.concurrent.Executors; 017import java.util.concurrent.TimeUnit; 018import java.util.concurrent.TimeoutException; 019import org.slf4j.Logger; 020import org.slf4j.LoggerFactory; 021 022/** 023 * ストリーム変換クラス。 024 * 025 * <p>ストリームを変換するクラス。ストリームを変換するコマンドを指定して、ストリームを変換する。 026 * 027 * <p>ストリームを変換するコマンドは、IStreamCommandインターフェースを実装したクラスである必要がある。 028 */ 029public class StreamConverter { 030 031 /** AutoCloseableラッパーでExecutorServiceのリソース管理を改善 */ 032 private static class AutoCloseableExecutorService implements AutoCloseable { 033 private final ExecutorService executor; 034 035 public AutoCloseableExecutorService(ExecutorService executor) { 036 this.executor = executor; 037 } 038 039 public CompletableFuture<CommandResult> supplyAsync( 040 java.util.function.Supplier<CommandResult> supplier) { 041 return CompletableFuture.supplyAsync(supplier, executor); 042 } 043 044 @Override 045 public void close() { 046 executor.shutdown(); 047 try { 048 if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { 049 LOG.warn("Executor did not terminate gracefully, forcing shutdown"); 050 executor.shutdownNow(); 051 if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { 052 LOG.error("Executor did not terminate after forced shutdown"); 053 } 054 } 055 } catch (InterruptedException e) { 056 Thread.currentThread().interrupt(); 057 executor.shutdownNow(); 058 } 059 } 060 } 061 062 private static final Logger LOG = LoggerFactory.getLogger(StreamConverter.class); 063 private static final int DEFAULT_BUFFER_SIZE = 64 * 1024; // 64KB buffer 064 private List<IStreamCommand> commands; 065 private ExecutionContext defaultContext; 066 067 /** 068 * Constructs a StreamConverter with the specified array of commands. 069 * 070 * @param commands the array of commands to be executed in sequence 071 * @throws NullPointerException if commands is null 072 * @throws IllegalArgumentException if commands is empty 073 */ 074 public StreamConverter(IStreamCommand[] commands) { 075 Objects.requireNonNull(commands, "commands cannot be null"); 076 if (commands.length == 0) { 077 throw new IllegalArgumentException("commands is empty."); 078 } 079 this.commands = List.of(commands); 080 } 081 082 /** 083 * Constructs a StreamConverter with the specified list of commands. 084 * 085 * @param commands the list of commands to be executed in sequence 086 * @throws NullPointerException if commands is null 087 * @throws IllegalArgumentException if commands is empty 088 */ 089 public StreamConverter(List<IStreamCommand> commands) { 090 Objects.requireNonNull(commands, "commands cannot be null"); 091 if (commands.isEmpty()) { 092 throw new IllegalArgumentException("commands is empty."); 093 } 094 this.commands = new ArrayList<>(commands); // Defensive copy 095 } 096 097 /** 098 * Creates a StreamConverter with the specified array of commands. 099 * 100 * @param commands the array of commands to be executed in sequence 101 * @return a new StreamConverter instance 102 * @throws NullPointerException if commands is null 103 * @throws IllegalArgumentException if commands is empty 104 */ 105 public static StreamConverter create(IStreamCommand... commands) { 106 return new StreamConverter(commands); 107 } 108 109 /** 110 * Creates a StreamConverter with the specified list of commands. 111 * 112 * @param commands the list of commands to be executed in sequence 113 * @return a new StreamConverter instance 114 * @throws NullPointerException if commands is null 115 * @throws IllegalArgumentException if commands is empty 116 */ 117 public static StreamConverter create(List<IStreamCommand> commands) { 118 return new StreamConverter(commands); 119 } 120 121 /** 122 * Creates a StreamConverter with a custom ExecutionContext and specified commands. All subsequent 123 * runs will use the provided context for MDC synchronization. 124 * 125 * @param context the execution context to use for MDC and logging 126 * @param commands the array of commands to be executed in sequence 127 * @return a new StreamConverter instance 128 * @throws NullPointerException if context or commands is null 129 * @throws IllegalArgumentException if commands is empty 130 */ 131 public static StreamConverter createWithContext( 132 ExecutionContext context, IStreamCommand... commands) { 133 Objects.requireNonNull(context, "context cannot be null"); 134 StreamConverter converter = new StreamConverter(commands); 135 converter.defaultContext = context; 136 return converter; 137 } 138 139 /** 140 * Creates a StreamConverter with a custom ExecutionContext and specified commands list. All 141 * subsequent runs will use the provided context for MDC synchronization. 142 * 143 * @param context the execution context to use for MDC and logging 144 * @param commands the list of commands to be executed in sequence 145 * @return a new StreamConverter instance 146 * @throws NullPointerException if context or commands is null 147 * @throws IllegalArgumentException if commands is empty 148 */ 149 public static StreamConverter createWithContext( 150 ExecutionContext context, List<IStreamCommand> commands) { 151 Objects.requireNonNull(context, "context cannot be null"); 152 StreamConverter converter = new StreamConverter(commands); 153 converter.defaultContext = context; 154 return converter; 155 } 156 157 /** 158 * Creates an optimal executor service based on available system resources and command count. 159 * 160 * @return an optimally configured ExecutorService 161 */ 162 private ExecutorService createOptimalExecutor() { 163 int availableCores = Runtime.getRuntime().availableProcessors(); 164 int optimalSize = Math.min(this.commands.size(), Math.max(2, availableCores)); 165 return Executors.newFixedThreadPool(optimalSize); 166 } 167 168 /** 169 * 非同期並列処理でストリームを変換する。 メモリ効率を重視し、PipedStreamを使用して大容量ファイルに対応。 自動的にExecutionContextを生成してMDC同期を実現する。 170 * 171 * @param inputStream 処理対象の入力ストリーム 172 * @param outputStream 処理結果を書き込む出力ストリーム 173 * @return 各コマンドの実行結果リスト 174 * @throws IOException ストリーム処理中にI/Oエラーが発生した場合 175 */ 176 public List<CommandResult> run(InputStream inputStream, OutputStream outputStream) 177 throws IOException { 178 // デフォルトコンテキストがあればそれを使用、なければ自動生成 179 ExecutionContext contextToUse = 180 defaultContext != null ? defaultContext : ExecutionContext.create(); 181 return run(inputStream, outputStream, contextToUse); 182 } 183 184 /** 185 * カスタムExecutionContextを使用してストリームを変換する。 マルチスレッド環境でのMDCコンテキスト伝播とログトレーサビリティを実現。 186 * 187 * @param inputStream 処理対象の入力ストリーム 188 * @param outputStream 処理結果を書き込む出力ストリーム 189 * @param context 実行コンテキスト 190 * @return 各コマンドの実行結果リスト 191 * @throws IOException ストリーム処理中にI/Oエラーが発生した場合 192 */ 193 public List<CommandResult> run( 194 InputStream inputStream, OutputStream outputStream, ExecutionContext context) 195 throws IOException { 196 Objects.requireNonNull(inputStream); 197 Objects.requireNonNull(outputStream); 198 Objects.requireNonNull(context, "context cannot be null"); 199 200 // パイプライン開始時にMDCコンテキストを設定 201 context.applyToMDCWithStage("pipeline-start"); 202 203 if (LOG.isInfoEnabled()) { 204 LOG.info( 205 "Starting StreamConverter with {} commands (executionId: {})", 206 commands.size(), 207 context.getExecutionId()); 208 } 209 210 // PipedStreamで並行処理(MDC対応) 211 return executeMultipleCommandsWithMDC(inputStream, outputStream, context); 212 } 213 214 /** コマンド(単一または複数)をMDC同期付きで並列実行 */ 215 private List<CommandResult> executeMultipleCommandsWithMDC( 216 InputStream inputStream, OutputStream outputStream, ExecutionContext context) 217 throws IOException { 218 List<CompletableFuture<CommandResult>> futures = new ArrayList<>(); 219 List<AutoCloseable> resources = new ArrayList<>(); 220 221 try (AutoCloseableExecutorService executor = 222 new AutoCloseableExecutorService(createOptimalExecutor())) { 223 InputStream currentInput = inputStream; 224 225 // パイプライン構築(MDC対応) 226 for (int i = 0; i < this.commands.size(); i++) { 227 IStreamCommand command = this.commands.get(i); 228 final int commandIndex = i; // Lambda用のfinal変数 229 230 final InputStream commandInput = currentInput; 231 final OutputStream commandOutput; 232 233 if (i == this.commands.size() - 1) { 234 // 最後のコマンド 235 commandOutput = outputStream; 236 } else { 237 // 中間コマンド: 次のコマンド用にPipedStreamペア作成 238 PipedOutputStream pipedOut = new PipedOutputStream(); 239 PipedInputStream pipedIn = new PipedInputStream(pipedOut, DEFAULT_BUFFER_SIZE); 240 resources.add(pipedOut); 241 resources.add(pipedIn); 242 commandOutput = pipedOut; 243 currentInput = pipedIn; 244 } 245 246 // 各コマンドを非同期実行(MDC同期付き) 247 CompletableFuture<CommandResult> future = 248 executor.supplyAsync( 249 () -> { 250 // スレッド固有のMDC設定 251 int sequence = context.getNextCommandSequence(); 252 String stageName = command.getClass().getSimpleName() + "-" + sequence; 253 context.applyToMDCWithStage(stageName); 254 255 if (LOG.isInfoEnabled()) { 256 LOG.info( 257 "Setting up command {} of {}: {} (sequence: {})", 258 commandIndex + 1, 259 commands.size(), 260 command.getClass().getSimpleName(), 261 sequence); 262 } 263 264 long startTime = System.currentTimeMillis(); 265 java.time.Instant startInstant = java.time.Instant.now(); 266 267 try { 268 // コマンド実行(MDCは自動的に利用可能) 269 command.execute(commandInput, commandOutput); 270 271 // 中間の PipedOutputStream は実行完了後にクローズする必要がある 272 if (commandOutput instanceof PipedOutputStream) { 273 commandOutput.close(); 274 } 275 276 long endTime = System.currentTimeMillis(); 277 java.time.Instant endInstant = java.time.Instant.now(); 278 279 if (LOG.isInfoEnabled()) { 280 LOG.info( 281 "Completed command: {} (sequence: {})", 282 command.getClass().getSimpleName(), 283 sequence); 284 } 285 286 return CommandResult.success( 287 command.getClass().getSimpleName(), 288 endTime - startTime, 289 0L, // 入力バイト数 290 0L, // 出力バイト数 291 startInstant, 292 endInstant); 293 294 } catch (IOException e) { 295 long endTime = System.currentTimeMillis(); 296 java.time.Instant endInstant = java.time.Instant.now(); 297 298 if (LOG.isErrorEnabled()) { 299 LOG.error( 300 "Command execution failed: {} (sequence: {}) - {}", 301 command.getClass().getSimpleName(), 302 sequence, 303 e.getMessage(), 304 e); 305 } 306 307 return CommandResult.failure( 308 command.getClass().getSimpleName(), 309 endTime - startTime, 310 e.getMessage(), 311 startInstant, 312 endInstant); 313 } 314 }); 315 316 futures.add(future); 317 } 318 319 // すべてのタスクの完了を待機 320 List<CommandResult> results = new ArrayList<>(); 321 for (CompletableFuture<CommandResult> future : futures) { 322 try { 323 // タイムアウト付きで待機(デッドロック防止) 324 CommandResult result = future.get(60, TimeUnit.SECONDS); 325 results.add(result); 326 327 // 失敗した場合は例外をスロー 328 if (!result.isSuccess()) { 329 throw new IOException("Command execution failed: " + result.getErrorMessage()); 330 } 331 332 } catch (ExecutionException e) { 333 Throwable cause = e.getCause(); 334 if (cause instanceof IOException) { 335 throw (IOException) cause; 336 } else if (cause instanceof RuntimeException) { 337 throw (RuntimeException) cause; 338 } 339 throw new IOException("Unexpected error during command execution", cause); 340 } catch (InterruptedException e) { 341 Thread.currentThread().interrupt(); 342 throw new IOException("Command execution was interrupted", e); 343 } catch (TimeoutException e) { 344 throw new IOException("Command execution timed out after 60 seconds", e); 345 } 346 } 347 348 if (LOG.isInfoEnabled()) { 349 LOG.info("All commands completed successfully (executionId: {})", context.getExecutionId()); 350 } 351 return results; 352 353 } finally { 354 // リソースクリーンアップ 355 closeResources(resources); 356 } 357 } 358 359 /** 360 * 使用したリソースを安全にクローズする 361 * 362 * @param resources クローズ対象のリソースリスト 363 */ 364 private void closeResources(List<AutoCloseable> resources) { 365 for (AutoCloseable resource : resources) { 366 try { 367 resource.close(); 368 } catch (Exception e) { 369 if (LOG.isWarnEnabled()) { 370 LOG.warn("Failed to close resource: {}", e.getMessage()); 371 } 372 } 373 } 374 } 375}