001package com.streamconverter;
002
003import com.streamconverter.command.IStreamCommand;
004import com.streamconverter.context.ExecutionContext;
005import java.io.IOException;
006import java.io.InputStream;
007import java.io.OutputStream;
008import java.io.PipedInputStream;
009import java.io.PipedOutputStream;
010import java.util.ArrayList;
011import java.util.List;
012import java.util.Objects;
013import java.util.concurrent.CompletableFuture;
014import java.util.concurrent.ExecutionException;
015import java.util.concurrent.ExecutorService;
016import java.util.concurrent.Executors;
017import java.util.concurrent.TimeUnit;
018import java.util.concurrent.TimeoutException;
019import org.slf4j.Logger;
020import org.slf4j.LoggerFactory;
021
022/**
023 * ストリーム変換クラス。
024 *
025 * <p>ストリームを変換するクラス。ストリームを変換するコマンドを指定して、ストリームを変換する。
026 *
027 * <p>ストリームを変換するコマンドは、IStreamCommandインターフェースを実装したクラスである必要がある。
028 */
029public class StreamConverter {
030
031  /** AutoCloseableラッパーでExecutorServiceのリソース管理を改善 */
032  private static class AutoCloseableExecutorService implements AutoCloseable {
033    private final ExecutorService executor;
034
035    public AutoCloseableExecutorService(ExecutorService executor) {
036      this.executor = executor;
037    }
038
039    public CompletableFuture<CommandResult> supplyAsync(
040        java.util.function.Supplier<CommandResult> supplier) {
041      return CompletableFuture.supplyAsync(supplier, executor);
042    }
043
044    @Override
045    public void close() {
046      executor.shutdown();
047      try {
048        if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
049          LOG.warn("Executor did not terminate gracefully, forcing shutdown");
050          executor.shutdownNow();
051          if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
052            LOG.error("Executor did not terminate after forced shutdown");
053          }
054        }
055      } catch (InterruptedException e) {
056        Thread.currentThread().interrupt();
057        executor.shutdownNow();
058      }
059    }
060  }
061
062  private static final Logger LOG = LoggerFactory.getLogger(StreamConverter.class);
063  private static final int DEFAULT_BUFFER_SIZE = 64 * 1024; // 64KB buffer
064  private List<IStreamCommand> commands;
065  private ExecutionContext defaultContext;
066
067  /**
068   * Constructs a StreamConverter with the specified array of commands.
069   *
070   * @param commands the array of commands to be executed in sequence
071   * @throws NullPointerException if commands is null
072   * @throws IllegalArgumentException if commands is empty
073   */
074  public StreamConverter(IStreamCommand[] commands) {
075    Objects.requireNonNull(commands, "commands cannot be null");
076    if (commands.length == 0) {
077      throw new IllegalArgumentException("commands is empty.");
078    }
079    this.commands = List.of(commands);
080  }
081
082  /**
083   * Constructs a StreamConverter with the specified list of commands.
084   *
085   * @param commands the list of commands to be executed in sequence
086   * @throws NullPointerException if commands is null
087   * @throws IllegalArgumentException if commands is empty
088   */
089  public StreamConverter(List<IStreamCommand> commands) {
090    Objects.requireNonNull(commands, "commands cannot be null");
091    if (commands.isEmpty()) {
092      throw new IllegalArgumentException("commands is empty.");
093    }
094    this.commands = new ArrayList<>(commands); // Defensive copy
095  }
096
097  /**
098   * Creates a StreamConverter with the specified array of commands.
099   *
100   * @param commands the array of commands to be executed in sequence
101   * @return a new StreamConverter instance
102   * @throws NullPointerException if commands is null
103   * @throws IllegalArgumentException if commands is empty
104   */
105  public static StreamConverter create(IStreamCommand... commands) {
106    return new StreamConverter(commands);
107  }
108
109  /**
110   * Creates a StreamConverter with the specified list of commands.
111   *
112   * @param commands the list of commands to be executed in sequence
113   * @return a new StreamConverter instance
114   * @throws NullPointerException if commands is null
115   * @throws IllegalArgumentException if commands is empty
116   */
117  public static StreamConverter create(List<IStreamCommand> commands) {
118    return new StreamConverter(commands);
119  }
120
121  /**
122   * Creates a StreamConverter with a custom ExecutionContext and specified commands. All subsequent
123   * runs will use the provided context for MDC synchronization.
124   *
125   * @param context the execution context to use for MDC and logging
126   * @param commands the array of commands to be executed in sequence
127   * @return a new StreamConverter instance
128   * @throws NullPointerException if context or commands is null
129   * @throws IllegalArgumentException if commands is empty
130   */
131  public static StreamConverter createWithContext(
132      ExecutionContext context, IStreamCommand... commands) {
133    Objects.requireNonNull(context, "context cannot be null");
134    StreamConverter converter = new StreamConverter(commands);
135    converter.defaultContext = context;
136    return converter;
137  }
138
139  /**
140   * Creates a StreamConverter with a custom ExecutionContext and specified commands list. All
141   * subsequent runs will use the provided context for MDC synchronization.
142   *
143   * @param context the execution context to use for MDC and logging
144   * @param commands the list of commands to be executed in sequence
145   * @return a new StreamConverter instance
146   * @throws NullPointerException if context or commands is null
147   * @throws IllegalArgumentException if commands is empty
148   */
149  public static StreamConverter createWithContext(
150      ExecutionContext context, List<IStreamCommand> commands) {
151    Objects.requireNonNull(context, "context cannot be null");
152    StreamConverter converter = new StreamConverter(commands);
153    converter.defaultContext = context;
154    return converter;
155  }
156
157  /**
158   * Creates an optimal executor service based on available system resources and command count.
159   *
160   * @return an optimally configured ExecutorService
161   */
162  private ExecutorService createOptimalExecutor() {
163    int availableCores = Runtime.getRuntime().availableProcessors();
164    int optimalSize = Math.min(this.commands.size(), Math.max(2, availableCores));
165    return Executors.newFixedThreadPool(optimalSize);
166  }
167
168  /**
169   * 非同期並列処理でストリームを変換する。 メモリ効率を重視し、PipedStreamを使用して大容量ファイルに対応。 自動的にExecutionContextを生成してMDC同期を実現する。
170   *
171   * @param inputStream 処理対象の入力ストリーム
172   * @param outputStream 処理結果を書き込む出力ストリーム
173   * @return 各コマンドの実行結果リスト
174   * @throws IOException ストリーム処理中にI/Oエラーが発生した場合
175   */
176  public List<CommandResult> run(InputStream inputStream, OutputStream outputStream)
177      throws IOException {
178    // デフォルトコンテキストがあればそれを使用、なければ自動生成
179    ExecutionContext contextToUse =
180        defaultContext != null ? defaultContext : ExecutionContext.create();
181    return run(inputStream, outputStream, contextToUse);
182  }
183
184  /**
185   * カスタムExecutionContextを使用してストリームを変換する。 マルチスレッド環境でのMDCコンテキスト伝播とログトレーサビリティを実現。
186   *
187   * @param inputStream 処理対象の入力ストリーム
188   * @param outputStream 処理結果を書き込む出力ストリーム
189   * @param context 実行コンテキスト
190   * @return 各コマンドの実行結果リスト
191   * @throws IOException ストリーム処理中にI/Oエラーが発生した場合
192   */
193  public List<CommandResult> run(
194      InputStream inputStream, OutputStream outputStream, ExecutionContext context)
195      throws IOException {
196    Objects.requireNonNull(inputStream);
197    Objects.requireNonNull(outputStream);
198    Objects.requireNonNull(context, "context cannot be null");
199
200    // パイプライン開始時にMDCコンテキストを設定
201    context.applyToMDCWithStage("pipeline-start");
202
203    if (LOG.isInfoEnabled()) {
204      LOG.info(
205          "Starting StreamConverter with {} commands (executionId: {})",
206          commands.size(),
207          context.getExecutionId());
208    }
209
210    // PipedStreamで並行処理(MDC対応)
211    return executeMultipleCommandsWithMDC(inputStream, outputStream, context);
212  }
213
214  /** コマンド(単一または複数)をMDC同期付きで並列実行 */
215  private List<CommandResult> executeMultipleCommandsWithMDC(
216      InputStream inputStream, OutputStream outputStream, ExecutionContext context)
217      throws IOException {
218    List<CompletableFuture<CommandResult>> futures = new ArrayList<>();
219    List<AutoCloseable> resources = new ArrayList<>();
220
221    try (AutoCloseableExecutorService executor =
222        new AutoCloseableExecutorService(createOptimalExecutor())) {
223      InputStream currentInput = inputStream;
224
225      // パイプライン構築(MDC対応)
226      for (int i = 0; i < this.commands.size(); i++) {
227        IStreamCommand command = this.commands.get(i);
228        final int commandIndex = i; // Lambda用のfinal変数
229
230        final InputStream commandInput = currentInput;
231        final OutputStream commandOutput;
232
233        if (i == this.commands.size() - 1) {
234          // 最後のコマンド
235          commandOutput = outputStream;
236        } else {
237          // 中間コマンド: 次のコマンド用にPipedStreamペア作成
238          PipedOutputStream pipedOut = new PipedOutputStream();
239          PipedInputStream pipedIn = new PipedInputStream(pipedOut, DEFAULT_BUFFER_SIZE);
240          resources.add(pipedOut);
241          resources.add(pipedIn);
242          commandOutput = pipedOut;
243          currentInput = pipedIn;
244        }
245
246        // 各コマンドを非同期実行(MDC同期付き)
247        CompletableFuture<CommandResult> future =
248            executor.supplyAsync(
249                () -> {
250                  // スレッド固有のMDC設定
251                  int sequence = context.getNextCommandSequence();
252                  String stageName = command.getClass().getSimpleName() + "-" + sequence;
253                  context.applyToMDCWithStage(stageName);
254
255                  if (LOG.isInfoEnabled()) {
256                    LOG.info(
257                        "Setting up command {} of {}: {} (sequence: {})",
258                        commandIndex + 1,
259                        commands.size(),
260                        command.getClass().getSimpleName(),
261                        sequence);
262                  }
263
264                  long startTime = System.currentTimeMillis();
265                  java.time.Instant startInstant = java.time.Instant.now();
266
267                  try {
268                    // コマンド実行(MDCは自動的に利用可能)
269                    command.execute(commandInput, commandOutput);
270
271                    // 中間の PipedOutputStream は実行完了後にクローズする必要がある
272                    if (commandOutput instanceof PipedOutputStream) {
273                      commandOutput.close();
274                    }
275
276                    long endTime = System.currentTimeMillis();
277                    java.time.Instant endInstant = java.time.Instant.now();
278
279                    if (LOG.isInfoEnabled()) {
280                      LOG.info(
281                          "Completed command: {} (sequence: {})",
282                          command.getClass().getSimpleName(),
283                          sequence);
284                    }
285
286                    return CommandResult.success(
287                        command.getClass().getSimpleName(),
288                        endTime - startTime,
289                        0L, // 入力バイト数
290                        0L, // 出力バイト数
291                        startInstant,
292                        endInstant);
293
294                  } catch (IOException e) {
295                    long endTime = System.currentTimeMillis();
296                    java.time.Instant endInstant = java.time.Instant.now();
297
298                    if (LOG.isErrorEnabled()) {
299                      LOG.error(
300                          "Command execution failed: {} (sequence: {}) - {}",
301                          command.getClass().getSimpleName(),
302                          sequence,
303                          e.getMessage(),
304                          e);
305                    }
306
307                    return CommandResult.failure(
308                        command.getClass().getSimpleName(),
309                        endTime - startTime,
310                        e.getMessage(),
311                        startInstant,
312                        endInstant);
313                  }
314                });
315
316        futures.add(future);
317      }
318
319      // すべてのタスクの完了を待機
320      List<CommandResult> results = new ArrayList<>();
321      for (CompletableFuture<CommandResult> future : futures) {
322        try {
323          // タイムアウト付きで待機(デッドロック防止)
324          CommandResult result = future.get(60, TimeUnit.SECONDS);
325          results.add(result);
326
327          // 失敗した場合は例外をスロー
328          if (!result.isSuccess()) {
329            throw new IOException("Command execution failed: " + result.getErrorMessage());
330          }
331
332        } catch (ExecutionException e) {
333          Throwable cause = e.getCause();
334          if (cause instanceof IOException) {
335            throw (IOException) cause;
336          } else if (cause instanceof RuntimeException) {
337            throw (RuntimeException) cause;
338          }
339          throw new IOException("Unexpected error during command execution", cause);
340        } catch (InterruptedException e) {
341          Thread.currentThread().interrupt();
342          throw new IOException("Command execution was interrupted", e);
343        } catch (TimeoutException e) {
344          throw new IOException("Command execution timed out after 60 seconds", e);
345        }
346      }
347
348      if (LOG.isInfoEnabled()) {
349        LOG.info("All commands completed successfully (executionId: {})", context.getExecutionId());
350      }
351      return results;
352
353    } finally {
354      // リソースクリーンアップ
355      closeResources(resources);
356    }
357  }
358
359  /**
360   * 使用したリソースを安全にクローズする
361   *
362   * @param resources クローズ対象のリソースリスト
363   */
364  private void closeResources(List<AutoCloseable> resources) {
365    for (AutoCloseable resource : resources) {
366      try {
367        resource.close();
368      } catch (Exception e) {
369        if (LOG.isWarnEnabled()) {
370          LOG.warn("Failed to close resource: {}", e.getMessage());
371        }
372      }
373    }
374  }
375}