001package com.streamconverter;
002
003import com.streamconverter.command.IStreamCommand;
004import java.io.IOException;
005import java.io.InputStream;
006import java.io.OutputStream;
007import java.util.ArrayList;
008import java.util.List;
009import java.util.Objects;
010import java.util.concurrent.CompletableFuture;
011import java.util.concurrent.ExecutionException;
012import java.util.concurrent.ExecutorService;
013import java.util.concurrent.Executors;
014import java.util.concurrent.ThreadFactory;
015import java.util.concurrent.TimeUnit;
016import org.slf4j.Logger;
017import org.slf4j.LoggerFactory;
018
019/**
020 * ストリーム変換クラス。
021 *
022 * <p>ストリームを変換するクラス。ストリームを変換するコマンドを指定して、ストリームを変換する。
023 *
024 * <p>ストリームを変換するコマンドは、IStreamCommandインターフェースを実装したクラスである必要がある。
025 *
026 * <p>親スレッドのMDCコンテキストは、{@link com.streamconverter.logging.InheritableMDCAdapter}が
027 * インストールされている場合、各コマンドの仮想スレッドに自動的に伝搬される。{@link
028 * com.streamconverter.logging.MDCInitializer#initialize()}が呼ばれていない場合、各ワーカースレッドは
029 * 独立した空のMDCコンテキストを持ち、親スレッドのMDC値は伝搬されない。
030 */
031public class StreamConverter {
032
033  /** AutoCloseableラッパーでExecutorServiceのリソース管理を改善 */
034  private static class AutoCloseableExecutorService implements AutoCloseable {
035    private final ExecutorService executor;
036
037    public AutoCloseableExecutorService(ExecutorService executor) {
038      this.executor = executor;
039    }
040
041    public CompletableFuture<Void> runAsync(Runnable runnable) {
042      return CompletableFuture.runAsync(runnable, executor);
043    }
044
045    @Override
046    public void close() {
047      executor.shutdown();
048      try {
049        if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
050          LOG.warn("Executor did not terminate gracefully, forcing shutdown");
051          executor.shutdownNow();
052          if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
053            LOG.error("Executor did not terminate after forced shutdown");
054          }
055        }
056      } catch (InterruptedException e) {
057        Thread.currentThread().interrupt();
058        executor.shutdownNow();
059      }
060    }
061  }
062
063  private static final Logger LOG = LoggerFactory.getLogger(StreamConverter.class);
064  private static final int DEFAULT_BUFFER_SIZE = 64 * 1024; // 64KB buffer
065  private final CommandStageRunner commandStageRunner;
066  private final PipelineCompletionMonitor pipelineCompletionMonitor;
067  private final PipelineFailureHandler pipelineFailureHandler;
068  private final PipelineWiring pipelineWiring;
069  private List<IStreamCommand> commands;
070  private final List<String> commandLabels;
071
072  private StreamConverter(List<IStreamCommand> commands) {
073    this.commandLabels = commands.stream().map(IStreamCommand::commandName).toList();
074    this.commands = wrapWithLogging(commands);
075    this.commandStageRunner = new CommandStageRunner();
076    this.pipelineFailureHandler = new PipelineFailureHandler();
077    this.pipelineCompletionMonitor = new PipelineCompletionMonitor();
078    this.pipelineWiring = new PipelineWiring(DEFAULT_BUFFER_SIZE);
079  }
080
081  /**
082   * Creates a StreamConverter with the specified array of commands.
083   *
084   * @param commands the array of commands to be executed in sequence
085   * @return a new StreamConverter instance
086   * @throws NullPointerException if commands is null
087   * @throws IllegalArgumentException if commands is empty
088   */
089  public static StreamConverter create(IStreamCommand... commands) {
090    Objects.requireNonNull(commands, "commands cannot be null");
091    if (commands.length == 0) {
092      throw new IllegalArgumentException("commands is empty.");
093    }
094    return new StreamConverter(List.of(commands));
095  }
096
097  /**
098   * Creates a StreamConverter with the specified list of commands.
099   *
100   * @param commands the list of commands to be executed in sequence
101   * @return a new StreamConverter instance
102   * @throws NullPointerException if commands is null
103   * @throws IllegalArgumentException if commands is empty
104   */
105  public static StreamConverter create(List<IStreamCommand> commands) {
106    Objects.requireNonNull(commands, "commands cannot be null");
107    if (commands.isEmpty()) {
108      throw new IllegalArgumentException("commands is empty.");
109    }
110    return new StreamConverter(List.copyOf(commands));
111  }
112
113  /** コマンドリストの各コマンドに withLogging をあらかじめ適用して返す。 ラッピングはコンストラクト時に1度だけ行われ、実行ごとのオーバーヘッドを排除する。 */
114  private static List<IStreamCommand> wrapWithLogging(List<IStreamCommand> commands) {
115    List<IStreamCommand> wrapped = new ArrayList<>(commands.size());
116    for (IStreamCommand command : commands) {
117      wrapped.add(command.withLogging(LOG));
118    }
119    return wrapped;
120  }
121
122  static boolean isPipeAbortedCause(Throwable cause) {
123    return PipelineFailureHandler.isPipeAbortedCause(cause);
124  }
125
126  /**
127   * Creates an optimal executor service based on available system resources and command count.
128   *
129   * @return an optimally configured ExecutorService
130   */
131  private ExecutorService createOptimalExecutor() {
132    ThreadFactory threadFactory = Thread.ofVirtual().name("stream-converter-", 0).factory();
133    return Executors.newThreadPerTaskExecutor(threadFactory);
134  }
135
136  /**
137   * 非同期並列処理でストリームを変換する。 メモリ効率を重視し、PipedStreamを使用して大容量ファイルに対応。
138   *
139   * @param inputStream 処理対象の入力ストリーム
140   * @param outputStream 処理結果を書き込む出力ストリーム
141   * @throws IOException ストリーム処理中にI/Oエラーが発生した場合
142   */
143  public void run(InputStream inputStream, OutputStream outputStream) throws IOException {
144    Objects.requireNonNull(inputStream);
145    Objects.requireNonNull(outputStream);
146
147    if (LOG.isInfoEnabled()) {
148      LOG.info("Starting StreamConverter with {} commands", commands.size());
149    }
150
151    executeCommands(inputStream, outputStream);
152
153    if (LOG.isInfoEnabled()) {
154      LOG.info("Completed StreamConverter pipeline");
155    }
156  }
157
158  /** コマンド(単一または複数)を並列実行 */
159  private void executeCommands(InputStream inputStream, OutputStream outputStream)
160      throws IOException {
161    List<CompletableFuture<Void>> futures = new ArrayList<>();
162    // 非同期実行側は配線済みの入出力だけを消費できるように、先にステージIOグラフを構築する。
163    PipelinePlan plan = pipelineWiring.build(commands.size(), inputStream, outputStream);
164    List<AutoCloseable> resources = new ArrayList<>(plan.resources());
165
166    try (AutoCloseableExecutorService executor =
167        new AutoCloseableExecutorService(createOptimalExecutor())) {
168      // 各コマンドを配線済みの input/output stream に接続して起動する。
169      futures.addAll(
170          commandStageRunner.startAll(commands, commandLabels, plan, executor::runAsync));
171
172      // パイプライン構築完了後に exceptionally ハンドラを登録する。
173      // これにより resources リストへの add が全て終わった後にワーカーからクローズが呼ばれることが保証される。
174      // いずれかのコマンドが失敗したら、すべてのパイプをクローズして
175      // ブロック中の書き込みを IOException で即座に解放する。
176      for (CompletableFuture<Void> future : futures) {
177        future.exceptionally(
178            t -> {
179              try {
180                closeResources(resources);
181              } catch (RuntimeException ex) {
182                LOG.error("Unexpected error during resource cleanup on pipeline failure", ex);
183              }
184              return null;
185            });
186      }
187
188      try {
189        // 全タスクの完了を待機してから根本原因を収集する。
190        // abort() による物理クローズで各コマンドが順不同で完了するため、
191        // 全完了後に走査することでレースコンディションを回避する。
192        pipelineCompletionMonitor.await(futures);
193      } catch (ExecutionException e) {
194        // abort 由来の二次的な失敗が出揃ったあとで、根本原因として返す失敗を解釈する。
195        pipelineFailureHandler.rethrowExecutionFailure(e, futures);
196      }
197
198      if (LOG.isInfoEnabled()) {
199        LOG.info("All commands completed successfully");
200      }
201
202    } finally {
203      // リソースクリーンアップ
204      closeResources(resources);
205    }
206  }
207
208  /**
209   * Closes resources in order, logging and continuing if individual closes fail.
210   *
211   * @param resources resources associated with the current pipeline execution
212   */
213  private void closeResources(List<AutoCloseable> resources) {
214    for (AutoCloseable resource : resources) {
215      try {
216        resource.close();
217      } catch (Exception e) {
218        LOG.warn("Failed to close resource [{}]", resource.getClass().getSimpleName(), e);
219      }
220    }
221  }
222}