001package com.streamconverter; 002 003import com.streamconverter.command.IStreamCommand; 004import java.io.IOException; 005import java.io.InputStream; 006import java.io.OutputStream; 007import java.util.ArrayList; 008import java.util.List; 009import java.util.Objects; 010import java.util.concurrent.CompletableFuture; 011import java.util.concurrent.ExecutionException; 012import java.util.concurrent.ExecutorService; 013import java.util.concurrent.Executors; 014import java.util.concurrent.ThreadFactory; 015import java.util.concurrent.TimeUnit; 016import org.slf4j.Logger; 017import org.slf4j.LoggerFactory; 018 019/** 020 * ストリーム変換クラス。 021 * 022 * <p>ストリームを変換するクラス。ストリームを変換するコマンドを指定して、ストリームを変換する。 023 * 024 * <p>ストリームを変換するコマンドは、IStreamCommandインターフェースを実装したクラスである必要がある。 025 * 026 * <p>親スレッドのMDCコンテキストは、{@link com.streamconverter.logging.InheritableMDCAdapter}が 027 * インストールされている場合、各コマンドの仮想スレッドに自動的に伝搬される。{@link 028 * com.streamconverter.logging.MDCInitializer#initialize()}が呼ばれていない場合、各ワーカースレッドは 029 * 独立した空のMDCコンテキストを持ち、親スレッドのMDC値は伝搬されない。 030 */ 031public class StreamConverter { 032 033 /** AutoCloseableラッパーでExecutorServiceのリソース管理を改善 */ 034 private static class AutoCloseableExecutorService implements AutoCloseable { 035 private final ExecutorService executor; 036 037 public AutoCloseableExecutorService(ExecutorService executor) { 038 this.executor = executor; 039 } 040 041 public CompletableFuture<Void> runAsync(Runnable runnable) { 042 return CompletableFuture.runAsync(runnable, executor); 043 } 044 045 @Override 046 public void close() { 047 executor.shutdown(); 048 try { 049 if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { 050 LOG.warn("Executor did not terminate gracefully, forcing shutdown"); 051 executor.shutdownNow(); 052 if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { 053 LOG.error("Executor did not terminate after forced shutdown"); 054 } 055 } 056 } catch (InterruptedException e) { 057 Thread.currentThread().interrupt(); 058 executor.shutdownNow(); 059 } 060 } 061 } 062 063 private static final Logger LOG = LoggerFactory.getLogger(StreamConverter.class); 064 private static final int DEFAULT_BUFFER_SIZE = 64 * 1024; // 64KB buffer 065 private final CommandStageRunner commandStageRunner; 066 private final PipelineCompletionMonitor pipelineCompletionMonitor; 067 private final PipelineFailureHandler pipelineFailureHandler; 068 private final PipelineWiring pipelineWiring; 069 private List<IStreamCommand> commands; 070 private final List<String> commandLabels; 071 072 private StreamConverter(List<IStreamCommand> commands) { 073 this.commandLabels = commands.stream().map(IStreamCommand::commandName).toList(); 074 this.commands = wrapWithLogging(commands); 075 this.commandStageRunner = new CommandStageRunner(); 076 this.pipelineFailureHandler = new PipelineFailureHandler(); 077 this.pipelineCompletionMonitor = new PipelineCompletionMonitor(); 078 this.pipelineWiring = new PipelineWiring(DEFAULT_BUFFER_SIZE); 079 } 080 081 /** 082 * Creates a StreamConverter with the specified array of commands. 083 * 084 * @param commands the array of commands to be executed in sequence 085 * @return a new StreamConverter instance 086 * @throws NullPointerException if commands is null 087 * @throws IllegalArgumentException if commands is empty 088 */ 089 public static StreamConverter create(IStreamCommand... commands) { 090 Objects.requireNonNull(commands, "commands cannot be null"); 091 if (commands.length == 0) { 092 throw new IllegalArgumentException("commands is empty."); 093 } 094 return new StreamConverter(List.of(commands)); 095 } 096 097 /** 098 * Creates a StreamConverter with the specified list of commands. 099 * 100 * @param commands the list of commands to be executed in sequence 101 * @return a new StreamConverter instance 102 * @throws NullPointerException if commands is null 103 * @throws IllegalArgumentException if commands is empty 104 */ 105 public static StreamConverter create(List<IStreamCommand> commands) { 106 Objects.requireNonNull(commands, "commands cannot be null"); 107 if (commands.isEmpty()) { 108 throw new IllegalArgumentException("commands is empty."); 109 } 110 return new StreamConverter(List.copyOf(commands)); 111 } 112 113 /** コマンドリストの各コマンドに withLogging をあらかじめ適用して返す。 ラッピングはコンストラクト時に1度だけ行われ、実行ごとのオーバーヘッドを排除する。 */ 114 private static List<IStreamCommand> wrapWithLogging(List<IStreamCommand> commands) { 115 List<IStreamCommand> wrapped = new ArrayList<>(commands.size()); 116 for (IStreamCommand command : commands) { 117 wrapped.add(command.withLogging(LOG)); 118 } 119 return wrapped; 120 } 121 122 static boolean isPipeAbortedCause(Throwable cause) { 123 return PipelineFailureHandler.isPipeAbortedCause(cause); 124 } 125 126 /** 127 * Creates an optimal executor service based on available system resources and command count. 128 * 129 * @return an optimally configured ExecutorService 130 */ 131 private ExecutorService createOptimalExecutor() { 132 ThreadFactory threadFactory = Thread.ofVirtual().name("stream-converter-", 0).factory(); 133 return Executors.newThreadPerTaskExecutor(threadFactory); 134 } 135 136 /** 137 * 非同期並列処理でストリームを変換する。 メモリ効率を重視し、PipedStreamを使用して大容量ファイルに対応。 138 * 139 * @param inputStream 処理対象の入力ストリーム 140 * @param outputStream 処理結果を書き込む出力ストリーム 141 * @throws IOException ストリーム処理中にI/Oエラーが発生した場合 142 */ 143 public void run(InputStream inputStream, OutputStream outputStream) throws IOException { 144 Objects.requireNonNull(inputStream); 145 Objects.requireNonNull(outputStream); 146 147 if (LOG.isInfoEnabled()) { 148 LOG.info("Starting StreamConverter with {} commands", commands.size()); 149 } 150 151 executeCommands(inputStream, outputStream); 152 153 if (LOG.isInfoEnabled()) { 154 LOG.info("Completed StreamConverter pipeline"); 155 } 156 } 157 158 /** コマンド(単一または複数)を並列実行 */ 159 private void executeCommands(InputStream inputStream, OutputStream outputStream) 160 throws IOException { 161 List<CompletableFuture<Void>> futures = new ArrayList<>(); 162 // 非同期実行側は配線済みの入出力だけを消費できるように、先にステージIOグラフを構築する。 163 PipelinePlan plan = pipelineWiring.build(commands.size(), inputStream, outputStream); 164 List<AutoCloseable> resources = new ArrayList<>(plan.resources()); 165 166 try (AutoCloseableExecutorService executor = 167 new AutoCloseableExecutorService(createOptimalExecutor())) { 168 // 各コマンドを配線済みの input/output stream に接続して起動する。 169 futures.addAll( 170 commandStageRunner.startAll(commands, commandLabels, plan, executor::runAsync)); 171 172 // パイプライン構築完了後に exceptionally ハンドラを登録する。 173 // これにより resources リストへの add が全て終わった後にワーカーからクローズが呼ばれることが保証される。 174 // いずれかのコマンドが失敗したら、すべてのパイプをクローズして 175 // ブロック中の書き込みを IOException で即座に解放する。 176 for (CompletableFuture<Void> future : futures) { 177 future.exceptionally( 178 t -> { 179 try { 180 closeResources(resources); 181 } catch (RuntimeException ex) { 182 LOG.error("Unexpected error during resource cleanup on pipeline failure", ex); 183 } 184 return null; 185 }); 186 } 187 188 try { 189 // 全タスクの完了を待機してから根本原因を収集する。 190 // abort() による物理クローズで各コマンドが順不同で完了するため、 191 // 全完了後に走査することでレースコンディションを回避する。 192 pipelineCompletionMonitor.await(futures); 193 } catch (ExecutionException e) { 194 // abort 由来の二次的な失敗が出揃ったあとで、根本原因として返す失敗を解釈する。 195 pipelineFailureHandler.rethrowExecutionFailure(e, futures); 196 } 197 198 if (LOG.isInfoEnabled()) { 199 LOG.info("All commands completed successfully"); 200 } 201 202 } finally { 203 // リソースクリーンアップ 204 closeResources(resources); 205 } 206 } 207 208 /** 209 * Closes resources in order, logging and continuing if individual closes fail. 210 * 211 * @param resources resources associated with the current pipeline execution 212 */ 213 private void closeResources(List<AutoCloseable> resources) { 214 for (AutoCloseable resource : resources) { 215 try { 216 resource.close(); 217 } catch (Exception e) { 218 LOG.warn("Failed to close resource [{}]", resource.getClass().getSimpleName(), e); 219 } 220 } 221 } 222}