001package com.streamconverter.examples;
002
003import com.streamconverter.StreamConverter;
004import com.streamconverter.command.IStreamCommand;
005import com.streamconverter.command.impl.csv.CsvNavigateCommand;
006import com.streamconverter.command.rule.MdcPropagatingRule;
007import com.streamconverter.command.rule.impl.string.TrimRule;
008import com.streamconverter.context.PipelineContext;
009import com.streamconverter.path.CSVPath;
010import java.io.ByteArrayInputStream;
011import java.io.ByteArrayOutputStream;
012import java.io.IOException;
013import java.io.InputStream;
014import java.nio.charset.StandardCharsets;
015import org.slf4j.Logger;
016import org.slf4j.LoggerFactory;
017
018/**
019 * 例3: PipelineContext によるスレッド間の値共有と MDC 伝搬
020 *
021 * <p>StreamConverter では各コマンドが別の仮想スレッドで並列実行される。 スレッドをまたいで値を共有するための仕組みが {@link PipelineContext} である。
022 *
023 * <p><b>この例で学べること:</b>
024 *
025 * <ul>
026 *   <li>複数コマンドは別スレッドで動くため、通常の変数で値を渡すことができない
027 *   <li>{@link PipelineContext#putShared(String, String)} でパイプライン内の全コマンドが参照できる値を登録できる
028 *   <li>{@link MdcPropagatingRule} は抽出した値を {@link PipelineContext} 経由で MDC に自動反映する
029 *   <li>後段コマンドのログにも前段で抽出した値が自動的に含まれる(TurboFilter による同期)
030 * </ul>
031 *
032 * <p><b>パイプライン構成(3段):</b>
033 *
034 * <pre>
035 * [コマンド1: ラムダ]
036 *   最初の1行から注文IDを読み取り PipelineContext.putShared("orderId", ...) に格納
037 *          ↓
038 * [コマンド2: CsvNavigateCommand + MdcPropagatingRule]
039 *   productName 列を抽出し、値を "productName" キーで MDC に自動伝搬
040 *   → このコマンドのログに orderId と productName が含まれる
041 *          ↓
042 * [コマンド3: CsvNavigateCommand + TrimRule]
043 *   address 列の前後空白をトリム
044 *   → このコマンドのログにも orderId が含まれる(PipelineContext 経由)
045 * </pre>
046 */
047public class PipelineContextExample {
048
049  private static final Logger log = LoggerFactory.getLogger(PipelineContextExample.class);
050
051  /**
052   * @param args コマンドライン引数(未使用)
053   * @throws IOException I/O エラー
054   */
055  public static void main(String[] args) throws IOException {
056    log.info("=== 例3: PipelineContext によるスレッド間値共有と MDC 伝搬 ===");
057
058    String csv =
059        "orderId,productName,address\n"
060            + "ORD-2026-0001,Laptop Computer,  Tokyo Japan  \n"
061            + "ORD-2026-0002,Wireless Mouse,  Osaka Japan  \n";
062
063    log.info("入力 CSV:\n{}", csv);
064
065    // --- コマンド1: ラムダで PipelineContext に注文IDを格納 ---
066    // 各コマンドは別スレッドで動くため、ローカル変数や static フィールドでは値を安全に共有できない。
067    // PipelineContext.putShared() を使うと、同じパイプライン内の全コマンドスレッドで値が共有される。
068    IStreamCommand extractOrderId =
069        (in, out) -> {
070          byte[] data = in.readAllBytes();
071          String content = new String(data, StandardCharsets.UTF_8);
072
073          // ヘッダーをスキップして最初のデータ行から orderId を抽出
074          String[] lines = content.split("\n");
075          if (lines.length > 1) {
076            String firstDataLine = lines[1];
077            String orderId = firstDataLine.split(",")[0].trim();
078            // putShared: 全コマンドスレッドで参照可能になり、呼び出しスレッドの MDC にも即反映
079            PipelineContext.putShared("orderId", orderId);
080            log.info("orderId を PipelineContext に格納: {}", orderId);
081          }
082
083          out.write(data);
084        };
085
086    // --- コマンド2: MdcPropagatingRule で productName を MDC に伝搬 ---
087    // MdcPropagatingRule は値をパススルーしながら PipelineContext.putShared() を呼び出す Rule。
088    // これにより、後続の全コマンドのログに productName が自動的に含まれる。
089    IStreamCommand propagateProductName =
090        CsvNavigateCommand.create(
091            CSVPath.of("productName"), MdcPropagatingRule.create("productName"));
092
093    // --- コマンド3: TrimRule で address をトリム ---
094    // このコマンドのログには orderId が含まれる。
095    // PipelineContextTurboFilter がログ出力直前に PipelineContext の共有値を MDC に同期するため。
096    IStreamCommand trimAddress = CsvNavigateCommand.create(CSVPath.of("address"), new TrimRule());
097
098    // --- パイプライン実行 ---
099    StreamConverter converter =
100        StreamConverter.create(extractOrderId, propagateProductName, trimAddress);
101
102    ByteArrayOutputStream output = new ByteArrayOutputStream();
103    try (InputStream inputStream = new ByteArrayInputStream(csv.getBytes(StandardCharsets.UTF_8))) {
104      converter.run(inputStream, output);
105    }
106
107    String result = output.toString(StandardCharsets.UTF_8);
108    String expected =
109        "orderId,productName,address\r\n"
110            + "ORD-2026-0001,Laptop Computer,Tokyo Japan\r\n"
111            + "ORD-2026-0002,Wireless Mouse,Osaka Japan\r\n";
112    log.info("期待値:\n{}", expected);
113    log.info("出力 CSV:\n{}", result);
114    log.info("ログの各行に orderId / productName が含まれていることを確認してください");
115  }
116}