001package com.streamconverter; 002 003import com.streamconverter.command.IStreamCommand; 004import com.streamconverter.context.ExecutionContext; 005import java.io.IOException; 006import java.io.InputStream; 007import java.io.OutputStream; 008import java.io.PipedInputStream; 009import java.io.PipedOutputStream; 010import java.util.ArrayList; 011import java.util.List; 012import java.util.Objects; 013import java.util.concurrent.CompletableFuture; 014import java.util.concurrent.ExecutionException; 015import java.util.concurrent.ExecutorService; 016import java.util.concurrent.Executors; 017import java.util.concurrent.ThreadFactory; 018import java.util.concurrent.TimeUnit; 019import java.util.concurrent.TimeoutException; 020import org.slf4j.Logger; 021import org.slf4j.LoggerFactory; 022 023/** 024 * ストリーム変換クラス。 025 * 026 * <p>ストリームを変換するクラス。ストリームを変換するコマンドを指定して、ストリームを変換する。 027 * 028 * <p>ストリームを変換するコマンドは、IStreamCommandインターフェースを実装したクラスである必要がある。 029 */ 030public class StreamConverter { 031 032 /** AutoCloseableラッパーでExecutorServiceのリソース管理を改善 */ 033 private static class AutoCloseableExecutorService implements AutoCloseable { 034 private final ExecutorService executor; 035 036 public AutoCloseableExecutorService(ExecutorService executor) { 037 this.executor = executor; 038 } 039 040 public CompletableFuture<CommandResult> supplyAsync( 041 java.util.function.Supplier<CommandResult> supplier) { 042 return CompletableFuture.supplyAsync(supplier, executor); 043 } 044 045 @Override 046 public void close() { 047 executor.shutdown(); 048 try { 049 if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { 050 LOG.warn("Executor did not terminate gracefully, forcing shutdown"); 051 executor.shutdownNow(); 052 if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { 053 LOG.error("Executor did not terminate after forced shutdown"); 054 } 055 } 056 } catch (InterruptedException e) { 057 Thread.currentThread().interrupt(); 058 executor.shutdownNow(); 059 } 060 } 061 } 062 063 private static final Logger LOG = LoggerFactory.getLogger(StreamConverter.class); 064 private static final int DEFAULT_BUFFER_SIZE = 64 * 1024; // 64KB buffer 065 private List<IStreamCommand> commands; 066 private ExecutionContext defaultContext; 067 068 /** 069 * Constructs a StreamConverter with the specified array of commands. 070 * 071 * @param commands the array of commands to be executed in sequence 072 * @throws NullPointerException if commands is null 073 * @throws IllegalArgumentException if commands is empty 074 */ 075 public StreamConverter(IStreamCommand[] commands) { 076 Objects.requireNonNull(commands, "commands cannot be null"); 077 if (commands.length == 0) { 078 throw new IllegalArgumentException("commands is empty."); 079 } 080 this.commands = List.of(commands); 081 } 082 083 /** 084 * Constructs a StreamConverter with the specified list of commands. 085 * 086 * @param commands the list of commands to be executed in sequence 087 * @throws NullPointerException if commands is null 088 * @throws IllegalArgumentException if commands is empty 089 */ 090 public StreamConverter(List<IStreamCommand> commands) { 091 Objects.requireNonNull(commands, "commands cannot be null"); 092 if (commands.isEmpty()) { 093 throw new IllegalArgumentException("commands is empty."); 094 } 095 this.commands = new ArrayList<>(commands); // Defensive copy 096 } 097 098 /** 099 * Creates a StreamConverter with the specified array of commands. 100 * 101 * @param commands the array of commands to be executed in sequence 102 * @return a new StreamConverter instance 103 * @throws NullPointerException if commands is null 104 * @throws IllegalArgumentException if commands is empty 105 */ 106 public static StreamConverter create(IStreamCommand... commands) { 107 return new StreamConverter(commands); 108 } 109 110 /** 111 * Creates a StreamConverter with the specified list of commands. 112 * 113 * @param commands the list of commands to be executed in sequence 114 * @return a new StreamConverter instance 115 * @throws NullPointerException if commands is null 116 * @throws IllegalArgumentException if commands is empty 117 */ 118 public static StreamConverter create(List<IStreamCommand> commands) { 119 return new StreamConverter(commands); 120 } 121 122 /** 123 * Creates a StreamConverter with a custom ExecutionContext and specified commands. All subsequent 124 * runs will use the provided context for MDC synchronization. 125 * 126 * @param context the execution context to use for MDC and logging 127 * @param commands the array of commands to be executed in sequence 128 * @return a new StreamConverter instance 129 * @throws NullPointerException if context or commands is null 130 * @throws IllegalArgumentException if commands is empty 131 */ 132 public static StreamConverter createWithContext( 133 ExecutionContext context, IStreamCommand... commands) { 134 Objects.requireNonNull(context, "context cannot be null"); 135 StreamConverter converter = new StreamConverter(commands); 136 converter.defaultContext = context; 137 return converter; 138 } 139 140 /** 141 * Creates a StreamConverter with a custom ExecutionContext and specified commands list. All 142 * subsequent runs will use the provided context for MDC synchronization. 143 * 144 * @param context the execution context to use for MDC and logging 145 * @param commands the list of commands to be executed in sequence 146 * @return a new StreamConverter instance 147 * @throws NullPointerException if context or commands is null 148 * @throws IllegalArgumentException if commands is empty 149 */ 150 public static StreamConverter createWithContext( 151 ExecutionContext context, List<IStreamCommand> commands) { 152 Objects.requireNonNull(context, "context cannot be null"); 153 StreamConverter converter = new StreamConverter(commands); 154 converter.defaultContext = context; 155 return converter; 156 } 157 158 /** 159 * Creates an optimal executor service based on available system resources and command count. 160 * 161 * @return an optimally configured ExecutorService 162 */ 163 private ExecutorService createOptimalExecutor() { 164 ThreadFactory threadFactory = Thread.ofVirtual().name("stream-converter-", 0).factory(); 165 return Executors.newThreadPerTaskExecutor(threadFactory); 166 } 167 168 /** 169 * 非同期並列処理でストリームを変換する。 メモリ効率を重視し、PipedStreamを使用して大容量ファイルに対応。 自動的にExecutionContextを生成してMDC同期を実現する。 170 * 171 * @param inputStream 処理対象の入力ストリーム 172 * @param outputStream 処理結果を書き込む出力ストリーム 173 * @return 各コマンドの実行結果リスト 174 * @throws IOException ストリーム処理中にI/Oエラーが発生した場合 175 */ 176 public List<CommandResult> run(InputStream inputStream, OutputStream outputStream) 177 throws IOException { 178 // デフォルトコンテキストがあればそれを使用、なければ自動生成 179 ExecutionContext contextToUse = 180 defaultContext != null ? defaultContext : ExecutionContext.create(); 181 return run(inputStream, outputStream, contextToUse); 182 } 183 184 /** 185 * カスタムExecutionContextを使用してストリームを変換する。 マルチスレッド環境でのMDCコンテキスト伝播とログトレーサビリティを実現。 186 * 187 * @param inputStream 処理対象の入力ストリーム 188 * @param outputStream 処理結果を書き込む出力ストリーム 189 * @param context 実行コンテキスト 190 * @return 各コマンドの実行結果リスト 191 * @throws IOException ストリーム処理中にI/Oエラーが発生した場合 192 */ 193 public List<CommandResult> run( 194 InputStream inputStream, OutputStream outputStream, ExecutionContext context) 195 throws IOException { 196 Objects.requireNonNull(inputStream); 197 Objects.requireNonNull(outputStream); 198 Objects.requireNonNull(context, "context cannot be null"); 199 200 // パイプライン開始時にMDCコンテキストを設定 201 context.applyToMDCWithStage("pipeline-start"); 202 203 if (LOG.isInfoEnabled()) { 204 LOG.info( 205 "Starting StreamConverter with {} commands (executionId: {})", 206 commands.size(), 207 context.getExecutionId()); 208 } 209 210 // PipedStreamで並行処理(MDC対応) 211 return executeMultipleCommandsWithMDC(inputStream, outputStream, context); 212 } 213 214 /** コマンド(単一または複数)をMDC同期付きで並列実行 */ 215 private List<CommandResult> executeMultipleCommandsWithMDC( 216 InputStream inputStream, OutputStream outputStream, ExecutionContext context) 217 throws IOException { 218 List<CompletableFuture<CommandResult>> futures = new ArrayList<>(); 219 List<AutoCloseable> resources = new ArrayList<>(); 220 221 try (AutoCloseableExecutorService executor = 222 new AutoCloseableExecutorService(createOptimalExecutor())) { 223 InputStream currentInput = inputStream; 224 225 // パイプライン構築(MDC対応) 226 for (int i = 0; i < this.commands.size(); i++) { 227 IStreamCommand command = this.commands.get(i); 228 final int commandIndex = i; // Lambda用のfinal変数 229 230 final InputStream commandInput = currentInput; 231 final OutputStream commandOutput; 232 233 if (i == this.commands.size() - 1) { 234 // 最後のコマンド 235 commandOutput = outputStream; 236 } else { 237 // 中間コマンド: 次のコマンド用にPipedStreamペア作成 238 PipedOutputStream pipedOut = new PipedOutputStream(); 239 PipedInputStream pipedIn = new PipedInputStream(pipedOut, DEFAULT_BUFFER_SIZE); 240 resources.add(pipedOut); 241 resources.add(pipedIn); 242 commandOutput = pipedOut; 243 currentInput = pipedIn; 244 } 245 246 // 各コマンドを非同期実行(MDC同期付き) 247 CompletableFuture<CommandResult> future = 248 executor.supplyAsync( 249 () -> { 250 // スレッド固有のMDC設定 251 int sequence = context.getNextCommandSequence(); 252 String stageName = command.getClass().getSimpleName() + "-" + sequence; 253 context.applyToMDCWithStage(stageName); 254 255 if (LOG.isInfoEnabled()) { 256 LOG.info( 257 "Setting up command {} of {}: {} (sequence: {})", 258 commandIndex + 1, 259 commands.size(), 260 command.getClass().getSimpleName(), 261 sequence); 262 } 263 264 long startTime = System.currentTimeMillis(); 265 java.time.Instant startInstant = java.time.Instant.now(); 266 267 try { 268 // コマンド実行(ExecutionContext付きで実行してMDC同期を有効化) 269 command.execute(commandInput, commandOutput, context); 270 271 // 中間の PipedOutputStream は実行完了後にクローズする必要がある 272 if (commandOutput instanceof PipedOutputStream) { 273 commandOutput.close(); 274 } 275 276 long endTime = System.currentTimeMillis(); 277 java.time.Instant endInstant = java.time.Instant.now(); 278 279 if (LOG.isInfoEnabled()) { 280 LOG.info( 281 "Completed command: {} (sequence: {})", 282 command.getClass().getSimpleName(), 283 sequence); 284 } 285 286 return CommandResult.success( 287 command.getClass().getSimpleName(), 288 endTime - startTime, 289 0L, // 入力バイト数 290 0L, // 出力バイト数 291 startInstant, 292 endInstant); 293 294 } catch (IOException e) { 295 long endTime = System.currentTimeMillis(); 296 java.time.Instant endInstant = java.time.Instant.now(); 297 298 if (LOG.isErrorEnabled()) { 299 LOG.error( 300 "Command execution failed: {} (sequence: {}) - {}", 301 command.getClass().getSimpleName(), 302 sequence, 303 e.getMessage(), 304 e); 305 } 306 307 return CommandResult.failure( 308 command.getClass().getSimpleName(), 309 endTime - startTime, 310 e.getMessage(), 311 startInstant, 312 endInstant); 313 } 314 }); 315 316 futures.add(future); 317 } 318 319 // すべてのタスクの完了を待機 320 List<CommandResult> results = new ArrayList<>(); 321 for (CompletableFuture<CommandResult> future : futures) { 322 try { 323 // タイムアウト付きで待機(デッドロック防止) 324 CommandResult result = future.get(60, TimeUnit.SECONDS); 325 results.add(result); 326 327 // 失敗した場合は例外をスロー 328 if (!result.isSuccess()) { 329 throw new IOException("Command execution failed: " + result.getErrorMessage()); 330 } 331 332 } catch (ExecutionException e) { 333 Throwable cause = e.getCause(); 334 if (cause instanceof IOException) { 335 throw (IOException) cause; 336 } else if (cause instanceof RuntimeException) { 337 throw (RuntimeException) cause; 338 } 339 throw new IOException("Unexpected error during command execution", cause); 340 } catch (InterruptedException e) { 341 Thread.currentThread().interrupt(); 342 throw new IOException("Command execution was interrupted", e); 343 } catch (TimeoutException e) { 344 throw new IOException("Command execution timed out after 60 seconds", e); 345 } 346 } 347 348 if (LOG.isInfoEnabled()) { 349 LOG.info("All commands completed successfully (executionId: {})", context.getExecutionId()); 350 } 351 return results; 352 353 } finally { 354 // リソースクリーンアップ 355 closeResources(resources); 356 } 357 } 358 359 /** 360 * 使用したリソースを安全にクローズする 361 * 362 * @param resources クローズ対象のリソースリスト 363 */ 364 private void closeResources(List<AutoCloseable> resources) { 365 for (AutoCloseable resource : resources) { 366 try { 367 resource.close(); 368 } catch (Exception e) { 369 if (LOG.isWarnEnabled()) { 370 LOG.warn("Failed to close resource: {}", e.getMessage()); 371 } 372 } 373 } 374 } 375}