001package com.streamconverter.examples; 002 003import com.streamconverter.StreamConverter; 004import com.streamconverter.command.IStreamCommand; 005import com.streamconverter.command.impl.csv.CsvNavigateCommand; 006import com.streamconverter.command.rule.MdcPropagatingRule; 007import com.streamconverter.command.rule.impl.string.TrimRule; 008import com.streamconverter.context.PipelineContext; 009import com.streamconverter.path.CSVPath; 010import java.io.ByteArrayInputStream; 011import java.io.ByteArrayOutputStream; 012import java.io.IOException; 013import java.io.InputStream; 014import java.nio.charset.StandardCharsets; 015import org.slf4j.Logger; 016import org.slf4j.LoggerFactory; 017 018/** 019 * 例3: PipelineContext によるスレッド間の値共有と MDC 伝搬 020 * 021 * <p>StreamConverter では各コマンドが別の仮想スレッドで並列実行される。 スレッドをまたいで値を共有するための仕組みが {@link PipelineContext} である。 022 * 023 * <p><b>この例で学べること:</b> 024 * 025 * <ul> 026 * <li>複数コマンドは別スレッドで動くため、通常の変数で値を渡すことができない 027 * <li>{@link PipelineContext#putShared(String, String)} でパイプライン内の全コマンドが参照できる値を登録できる 028 * <li>{@link MdcPropagatingRule} は抽出した値を {@link PipelineContext} 経由で MDC に自動反映する 029 * <li>後段コマンドのログにも前段で抽出した値が自動的に含まれる(TurboFilter による同期) 030 * </ul> 031 * 032 * <p><b>パイプライン構成(3段):</b> 033 * 034 * <pre> 035 * [コマンド1: ラムダ] 036 * 最初の1行から注文IDを読み取り PipelineContext.putShared("orderId", ...) に格納 037 * ↓ 038 * [コマンド2: CsvNavigateCommand + MdcPropagatingRule] 039 * productName 列を抽出し、値を "productName" キーで MDC に自動伝搬 040 * → このコマンドのログに orderId と productName が含まれる 041 * ↓ 042 * [コマンド3: CsvNavigateCommand + TrimRule] 043 * address 列の前後空白をトリム 044 * → このコマンドのログにも orderId が含まれる(PipelineContext 経由) 045 * </pre> 046 */ 047public class PipelineContextExample { 048 049 private static final Logger log = LoggerFactory.getLogger(PipelineContextExample.class); 050 051 /** 052 * @param args コマンドライン引数(未使用) 053 * @throws IOException I/O エラー 054 */ 055 public static void main(String[] args) throws IOException { 056 log.info("=== 例3: PipelineContext によるスレッド間値共有と MDC 伝搬 ==="); 057 058 String csv = 059 "orderId,productName,address\n" 060 + "ORD-2026-0001,Laptop Computer, Tokyo Japan \n" 061 + "ORD-2026-0002,Wireless Mouse, Osaka Japan \n"; 062 063 log.info("入力 CSV:\n{}", csv); 064 065 // --- コマンド1: ラムダで PipelineContext に注文IDを格納 --- 066 // 各コマンドは別スレッドで動くため、ローカル変数や static フィールドでは値を安全に共有できない。 067 // PipelineContext.putShared() を使うと、同じパイプライン内の全コマンドスレッドで値が共有される。 068 IStreamCommand extractOrderId = 069 (in, out) -> { 070 byte[] data = in.readAllBytes(); 071 String content = new String(data, StandardCharsets.UTF_8); 072 073 // ヘッダーをスキップして最初のデータ行から orderId を抽出 074 String[] lines = content.split("\n"); 075 if (lines.length > 1) { 076 String firstDataLine = lines[1]; 077 String orderId = firstDataLine.split(",")[0].trim(); 078 // putShared: 全コマンドスレッドで参照可能になり、呼び出しスレッドの MDC にも即反映 079 PipelineContext.putShared("orderId", orderId); 080 log.info("orderId を PipelineContext に格納: {}", orderId); 081 } 082 083 out.write(data); 084 }; 085 086 // --- コマンド2: MdcPropagatingRule で productName を MDC に伝搬 --- 087 // MdcPropagatingRule は値をパススルーしながら PipelineContext.putShared() を呼び出す Rule。 088 // これにより、後続の全コマンドのログに productName が自動的に含まれる。 089 IStreamCommand propagateProductName = 090 CsvNavigateCommand.create( 091 CSVPath.of("productName"), MdcPropagatingRule.create("productName")); 092 093 // --- コマンド3: TrimRule で address をトリム --- 094 // このコマンドのログには orderId が含まれる。 095 // PipelineContextTurboFilter がログ出力直前に PipelineContext の共有値を MDC に同期するため。 096 IStreamCommand trimAddress = CsvNavigateCommand.create(CSVPath.of("address"), new TrimRule()); 097 098 // --- パイプライン実行 --- 099 StreamConverter converter = 100 StreamConverter.create(extractOrderId, propagateProductName, trimAddress); 101 102 ByteArrayOutputStream output = new ByteArrayOutputStream(); 103 try (InputStream inputStream = new ByteArrayInputStream(csv.getBytes(StandardCharsets.UTF_8))) { 104 converter.run(inputStream, output); 105 } 106 107 String result = output.toString(StandardCharsets.UTF_8); 108 String expected = 109 "orderId,productName,address\r\n" 110 + "ORD-2026-0001,Laptop Computer,Tokyo Japan\r\n" 111 + "ORD-2026-0002,Wireless Mouse,Osaka Japan\r\n"; 112 log.info("期待値:\n{}", expected); 113 log.info("出力 CSV:\n{}", result); 114 log.info("ログの各行に orderId / productName が含まれていることを確認してください"); 115 } 116}