001package com.streamconverter.web;
002
003import java.io.IOException;
004import java.io.InputStream;
005import java.util.Locale;
006import java.util.concurrent.ExecutorService;
007import java.util.concurrent.Executors;
008import org.slf4j.Logger;
009import org.slf4j.LoggerFactory;
010import org.springframework.core.io.buffer.DataBuffer;
011import org.springframework.core.io.buffer.DefaultDataBufferFactory;
012import org.springframework.core.io.buffer.DataBufferUtils;
013import org.springframework.http.HttpStatus;
014import org.springframework.http.MediaType;
015import org.springframework.http.ResponseEntity;
016import org.springframework.web.bind.annotation.*;
017
018import com.streamconverter.StreamConverter;
019import com.streamconverter.command.IStreamCommand;
020import com.streamconverter.command.impl.csv.CsvNavigateCommand;
021import com.streamconverter.command.impl.json.JsonNavigateCommand;
022import com.streamconverter.command.rule.PassThroughRule;
023import com.streamconverter.path.CSVPath;
024import com.streamconverter.path.TreePath;
025
026import reactor.core.publisher.Flux;
027import reactor.core.publisher.Mono;
028
029/**
030 * Web API controller for StreamConverter processing.
031 *
032 * <p>Provides REST endpoints to process data streams using existing StreamConverter functionality.
033 */
034@RestController
035@RequestMapping("/api/v1/stream")
036public class StreamProcessingController {
037
038  private static final Logger log = LoggerFactory.getLogger(StreamProcessingController.class);
039
040  private static final int MAX_PIPELINE_CONFIG_LENGTH = 1000;
041  private static final int MAX_PIPELINE_COMMANDS = 10;
042  private static final int MAX_PARAMETER_LENGTH = 500;
043
044  /**
045   * Process data stream with CSV extraction.
046   *
047   * @param inputData binary input data
048   * @param columnName CSV column name to extract
049   * @return processed data as binary stream
050   */
051  @PostMapping(
052      value = "/csv/extract",
053      consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE,
054      produces = MediaType.APPLICATION_OCTET_STREAM_VALUE)
055  public Mono<ResponseEntity<Flux<DataBuffer>>> processCsvExtraction(
056      @RequestBody Flux<DataBuffer> inputData, @RequestParam String columnName) {
057
058    log.info("Processing CSV extraction for column: {}", columnName);
059
060    return Mono
061        .fromCallable(
062            () ->
063                ResponseEntity.ok(
064                    processWithStreamConverter(
065                        inputData,
066                        CsvNavigateCommand.create(
067                            CSVPath.of(columnName), new PassThroughRule()))))
068        .onErrorResume(
069            e -> {
070              log.error("CSV extraction failed: {}", e.getMessage(), e);
071              return Mono.just(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build());
072            });
073  }
074
075  /**
076   * Process data stream with JSON path extraction.
077   *
078   * @param inputData binary input data
079   * @param jsonPath JSON path expression
080   * @return processed data as binary stream
081   */
082  @PostMapping(
083      value = "/json/extract",
084      consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE,
085      produces = MediaType.APPLICATION_OCTET_STREAM_VALUE)
086  public Mono<ResponseEntity<Flux<DataBuffer>>> processJsonExtraction(
087      @RequestBody Flux<DataBuffer> inputData, @RequestParam String jsonPath) {
088
089    log.info("Processing JSON extraction for path: {}", jsonPath);
090
091    return Mono
092        .fromCallable(
093            () ->
094                ResponseEntity.ok(
095                    processWithStreamConverter(
096                        inputData,
097                        JsonNavigateCommand.create(
098                            TreePath.fromJson(jsonPath), new PassThroughRule()))))
099        .onErrorResume(
100            e -> {
101              log.error("JSON extraction failed: {}", e.getMessage(), e);
102              return Mono.just(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build());
103            });
104  }
105
106  /**
107   * Process data stream with custom command pipeline.
108   *
109   * @param inputData binary input data
110   * @param pipelineConfig pipeline configuration header
111   * @return processed data as binary stream
112   */
113  @PostMapping(
114      value = "/process",
115      consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE,
116      produces = MediaType.APPLICATION_OCTET_STREAM_VALUE)
117  public Mono<ResponseEntity<Flux<DataBuffer>>> processWithPipeline(
118      @RequestBody Flux<DataBuffer> inputData,
119      @RequestHeader("X-Pipeline-Config") String pipelineConfig) {
120
121    return Mono
122        .fromCallable(() -> buildPipelineFromConfig(pipelineConfig))
123        .map(
124            commands -> {
125              log.info("Processing pipeline with {} commands", commands.length);
126              return ResponseEntity.ok(processWithStreamConverter(inputData, commands));
127            })
128        .onErrorResume(
129            IllegalArgumentException.class,
130            e -> {
131              log.warn("Invalid pipeline config: {}", e.getMessage());
132              return Mono.just(ResponseEntity.status(HttpStatus.BAD_REQUEST).build());
133            })
134        .onErrorResume(
135            e -> {
136              log.error("Pipeline processing failed: {}", e.getMessage(), e);
137              return Mono.just(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build());
138            });
139  }
140
141  /**
142   * Health check endpoint.
143   *
144   * @return simple health status
145   */
146  @GetMapping("/health")
147  public Mono<ResponseEntity<String>> health() {
148    return Mono.just(ResponseEntity.ok("StreamConverter Web API is running"));
149  }
150
151  /**
152   * Processes data using StreamConverter with given commands in a streaming fashion.
153   *
154   * @param inputData reactive stream of input data buffers
155   * @param commands stream commands to execute
156   * @return processed output as Flux of DataBuffer
157   */
158  private Flux<DataBuffer> processWithStreamConverter(
159      Flux<DataBuffer> inputData, IStreamCommand... commands) {
160    DefaultDataBufferFactory bufferFactory = new DefaultDataBufferFactory();
161    ExecutorService executor = Executors.newSingleThreadExecutor();
162    return Flux
163        .from(
164            DataBufferUtils.outputStreamPublisher(
165                outputStream -> {
166                  try (InputStream inputStream =
167                      DataBufferUtils.subscriberInputStream(inputData, 4096)) {
168                    StreamConverter converter = StreamConverter.create(commands);
169                    converter.run(inputStream, outputStream);
170                  } catch (IOException e) {
171                    throw new RuntimeException("Stream processing failed", e);
172                  }
173                },
174                bufferFactory,
175                executor))
176        .doFinally(signalType -> executor.shutdown());
177  }
178
179  /**
180   * Builds a pipeline from configuration string.
181   *
182   * @param config pipeline configuration (e.g., "csv:name,json:$.result,process:validator")
183   * @return array of stream commands
184   * @throws IllegalArgumentException config が null/空/長すぎる、またはコマンド数・パラメータが不正な場合
185   */
186  private IStreamCommand[] buildPipelineFromConfig(String config) {
187    if (config == null || config.isBlank()) {
188      throw new IllegalArgumentException("Pipeline config must not be null or blank");
189    }
190    if (config.length() > MAX_PIPELINE_CONFIG_LENGTH) {
191      throw new IllegalArgumentException(
192          "Pipeline config exceeds maximum length of " + MAX_PIPELINE_CONFIG_LENGTH);
193    }
194
195    String[] commandConfigs = config.split(",", -1);
196    if (commandConfigs.length > MAX_PIPELINE_COMMANDS) {
197      throw new IllegalArgumentException(
198          "Pipeline config exceeds maximum command count of " + MAX_PIPELINE_COMMANDS);
199    }
200
201    IStreamCommand[] commands = new IStreamCommand[commandConfigs.length];
202
203    for (int i = 0; i < commandConfigs.length; i++) {
204      String[] parts = commandConfigs[i].split(":", 2);
205      String commandType = parts[0].trim();
206      if (commandType.isEmpty()) {
207        throw new IllegalArgumentException("Command type must not be empty at index " + i);
208      }
209      String parameter = parts.length > 1 ? parts[1].trim() : "";
210      if (parameter.length() > MAX_PARAMETER_LENGTH) {
211        throw new IllegalArgumentException(
212            "Parameter exceeds maximum length of " + MAX_PARAMETER_LENGTH + " at index " + i);
213      }
214
215      commands[i] =
216          switch (commandType.toLowerCase(Locale.ROOT)) {
217            case "csv" -> {
218              if (parameter.isEmpty()) {
219                throw new IllegalArgumentException("csv command requires a column name at index " + i);
220              }
221              yield CsvNavigateCommand.create(CSVPath.of(parameter), new PassThroughRule());
222            }
223            case "json" -> {
224              if (parameter.isEmpty()) {
225                throw new IllegalArgumentException("json command requires a path at index " + i);
226              }
227              yield JsonNavigateCommand.create(TreePath.fromJson(parameter), new PassThroughRule());
228            }
229            case "process" -> (IStreamCommand) (in, out) -> in.transferTo(out);
230            default -> throw new IllegalArgumentException("Unknown command type: " + commandType);
231          };
232    }
233
234    log.info("Built pipeline with {} commands", commands.length);
235    return commands;
236  }
237}