001package com.streamconverter.examples;
002
003import com.streamconverter.StreamConverter;
004import com.streamconverter.command.IStreamCommand;
005import com.streamconverter.command.impl.SampleStreamCommand;
006import com.streamconverter.command.impl.csv.CsvValidateCommand;
007import java.io.ByteArrayInputStream;
008import java.io.ByteArrayOutputStream;
009import java.io.IOException;
010import java.nio.charset.StandardCharsets;
011import org.slf4j.Logger;
012import org.slf4j.LoggerFactory;
013import org.slf4j.MDC;
014
015/**
016 * 複数コマンドを束にした複雑なパイプライン処理の例
017 *
018 * <p>以下のような処理フローを実装します: 1. バリデータコマンド(入力データ検証) 2. MDC設定コマンド(ログコンテキスト設定) 3. DB変換コマンド(データベース変換処理) 4.
019 * 通信コマンド(外部API呼び出し) 5. バリデータコマンド(レスポンス検証) 6. DB逆変換コマンド(逆変換処理)
020 */
021public class ComplexPipelineExample {
022  private static final Logger logger = LoggerFactory.getLogger(ComplexPipelineExample.class);
023
024  /**
025   * メインメソッド
026   *
027   * @param args コマンドライン引数
028   */
029  public static void main(String[] args) {
030    logger.info("🔧 Complex Pipeline Processing Example");
031    logger.info("=====================================\n");
032
033    try {
034      demonstrateComplexPipeline();
035    } catch (Exception e) {
036      logger.error("Complex pipeline demonstration failed: {}", e.getMessage(), e);
037    }
038  }
039
040  /** 複雑なパイプライン処理のデモンストレーション */
041  private static void demonstrateComplexPipeline() throws IOException {
042    // サンプル入力データ(CSV形式)
043    String inputCsvData =
044        """
045        id,name,email,department
046        1,John Doe,john@example.com,Engineering
047        2,Jane Smith,jane@example.com,Marketing
048        3,Bob Johnson,bob@example.com,Sales
049        """;
050
051    logger.info("📋 Input Data:");
052    logger.info(inputCsvData);
053
054    // 複数コマンドを束にしたパイプライン構築
055    IStreamCommand[] complexPipeline = {
056      // 1. 入力バリデータコマンド
057      createInputValidator(),
058
059      // 2. MDC設定コマンド(ログコンテキスト)
060      createMdcSetupCommand(),
061
062      // 3. DB変換コマンド(データベース変換処理)
063      createDbTransformCommand(),
064
065      // 4. 通信コマンド(外部API呼び出し)
066      createCommunicationCommand(),
067
068      // 5. レスポンスバリデータコマンド
069      createResponseValidator(),
070
071      // 6. DB逆変換コマンド
072      createDbReverseTransformCommand()
073    };
074
075    // パイプライン実行
076    StreamConverter converter = StreamConverter.create(complexPipeline);
077
078    ByteArrayInputStream inputStream =
079        new ByteArrayInputStream(inputCsvData.getBytes(StandardCharsets.UTF_8));
080    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
081
082    logger.info("🚀 Executing complex pipeline with {} commands...", complexPipeline.length);
083
084    long startTime = System.currentTimeMillis();
085    converter.run(inputStream, outputStream);
086    long endTime = System.currentTimeMillis();
087
088    // 結果出力
089    String result = outputStream.toString(StandardCharsets.UTF_8);
090    logger.info("✅ Pipeline completed successfully in {} ms", endTime - startTime);
091    logger.info("📤 Final Result:");
092    logger.info(result);
093
094    logger.info("\n" + "=".repeat(60));
095  }
096
097  /** 入力バリデータコマンドを作成 */
098  private static IStreamCommand createInputValidator() {
099    logger.info("🔍 Creating input validator command");
100
101    // CSV必須カラムを定義
102    String[] requiredColumns = {"id", "name", "email", "department"};
103
104    // CsvValidateCommandを使用して入力検証
105    return new CsvValidateCommand(requiredColumns);
106  }
107
108  /** MDC設定コマンドを作成 */
109  private static IStreamCommand createMdcSetupCommand() {
110    logger.info("📝 Creating MDC setup command");
111
112    return new IStreamCommand() {
113      @Override
114      public void execute(java.io.InputStream inputStream, java.io.OutputStream outputStream)
115          throws IOException {
116        logger.info("Setting up MDC context");
117
118        // MDCにコンテキスト情報を設定
119        MDC.put("requestId", "REQ-" + System.currentTimeMillis());
120        MDC.put("pipelineStage", "mdc-setup");
121        MDC.put("processType", "complex-pipeline");
122
123        logger.info("MDC context configured successfully");
124
125        // データをそのまま次のコマンドに渡す
126        inputStream.transferTo(outputStream);
127
128        logger.info("MDC setup command completed");
129      }
130    };
131  }
132
133  /** DB変換コマンドを作成 */
134  private static IStreamCommand createDbTransformCommand() {
135    logger.info("🔄 Creating DB transform command");
136
137    return new SampleStreamCommand("db-transform") {
138      @Override
139      public void executeInternal(
140          java.io.InputStream inputStream, java.io.OutputStream outputStream) throws IOException {
141        MDC.put("pipelineStage", "db-transform");
142        logger.info("Executing database transformation");
143
144        // 実際のDB変換処理をシミュレート
145        super.executeInternal(inputStream, outputStream);
146
147        logger.info("Database transformation completed");
148      }
149    };
150  }
151
152  /** 通信コマンドを作成 */
153  private static IStreamCommand createCommunicationCommand() {
154    logger.info("🌐 Creating communication command");
155
156    return new SampleStreamCommand("http-communication") {
157      @Override
158      public void executeInternal(
159          java.io.InputStream inputStream, java.io.OutputStream outputStream) throws IOException {
160        MDC.put("pipelineStage", "communication");
161        logger.info("Executing external API communication");
162
163        // 実際のHTTP通信をシミュレート
164        // 本来であればSendHttpCommandを使用
165        super.executeInternal(inputStream, outputStream);
166
167        logger.info("External API communication completed");
168      }
169    };
170  }
171
172  /** レスポンスバリデータコマンドを作成 */
173  private static IStreamCommand createResponseValidator() {
174    logger.info("✅ Creating response validator command");
175
176    return new SampleStreamCommand("response-validator") {
177      @Override
178      public void executeInternal(
179          java.io.InputStream inputStream, java.io.OutputStream outputStream) throws IOException {
180        MDC.put("pipelineStage", "response-validation");
181        logger.info("Validating API response");
182
183        // レスポンスバリデーション処理をシミュレート
184        super.executeInternal(inputStream, outputStream);
185
186        logger.info("Response validation completed");
187      }
188    };
189  }
190
191  /** DB逆変換コマンドを作成 */
192  private static IStreamCommand createDbReverseTransformCommand() {
193    logger.info("🔙 Creating DB reverse transform command");
194
195    return new SampleStreamCommand("db-reverse-transform") {
196      @Override
197      public void executeInternal(
198          java.io.InputStream inputStream, java.io.OutputStream outputStream) throws IOException {
199        MDC.put("pipelineStage", "db-reverse-transform");
200        logger.info("Executing database reverse transformation");
201
202        // DB逆変換処理をシミュレート
203        super.executeInternal(inputStream, outputStream);
204
205        logger.info("Database reverse transformation completed");
206
207        // MDCクリーンアップ
208        MDC.clear();
209        logger.info("MDC context cleared");
210      }
211    };
212  }
213}