001package com.streamconverter.web;
002
003import java.io.IOException;
004import java.io.InputStream;
005import java.util.concurrent.ExecutorService;
006import java.util.concurrent.Executors;
007import org.slf4j.Logger;
008import org.slf4j.LoggerFactory;
009import org.springframework.core.io.buffer.DataBuffer;
010import org.springframework.core.io.buffer.DefaultDataBufferFactory;
011import org.springframework.core.io.buffer.DataBufferUtils;
012import org.springframework.http.HttpStatus;
013import org.springframework.http.MediaType;
014import org.springframework.http.ResponseEntity;
015import org.springframework.web.bind.annotation.*;
016
017import com.streamconverter.StreamConverter;
018import com.streamconverter.command.IStreamCommand;
019import com.streamconverter.command.impl.SampleStreamCommand;
020import com.streamconverter.command.impl.csv.CsvNavigateCommand;
021import com.streamconverter.command.impl.json.JsonNavigateCommand;
022import com.streamconverter.command.rule.PassThroughRule;
023import com.streamconverter.path.CSVPath;
024import com.streamconverter.path.TreePath;
025
026import reactor.core.publisher.Flux;
027import reactor.core.publisher.Mono;
028
029/**
030 * Web API controller for StreamConverter processing.
031 *
032 * <p>Provides REST endpoints to process data streams using existing StreamConverter functionality.
033 */
034@RestController
035@RequestMapping("/api/v1/stream")
036public class StreamProcessingController {
037
038  private static final Logger log = LoggerFactory.getLogger(StreamProcessingController.class);
039
040  /**
041   * Process data stream with CSV extraction.
042   *
043   * @param inputData binary input data
044   * @param columnName CSV column name to extract
045   * @return processed data as binary stream
046   */
047  @PostMapping(
048      value = "/csv/extract",
049      consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE,
050      produces = MediaType.APPLICATION_OCTET_STREAM_VALUE)
051  public Mono<ResponseEntity<Flux<DataBuffer>>> processCsvExtraction(
052      @RequestBody Flux<DataBuffer> inputData, @RequestParam String columnName) {
053
054    log.info("Processing CSV extraction for column: {}", columnName);
055
056    return Mono
057        .fromCallable(
058            () ->
059                ResponseEntity.ok(
060                    processWithStreamConverter(
061                        inputData,
062                        CsvNavigateCommand.create(
063                            new CSVPath(columnName), new PassThroughRule()))))
064        .onErrorReturn(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build());
065  }
066
067  /**
068   * Process data stream with JSON path extraction.
069   *
070   * @param inputData binary input data
071   * @param jsonPath JSON path expression
072   * @return processed data as binary stream
073   */
074  @PostMapping(
075      value = "/json/extract",
076      consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE,
077      produces = MediaType.APPLICATION_OCTET_STREAM_VALUE)
078  public Mono<ResponseEntity<Flux<DataBuffer>>> processJsonExtraction(
079      @RequestBody Flux<DataBuffer> inputData, @RequestParam String jsonPath) {
080
081    log.info("Processing JSON extraction for path: {}", jsonPath);
082
083    return Mono
084        .fromCallable(
085            () ->
086                ResponseEntity.ok(
087                    processWithStreamConverter(
088                        inputData,
089                        JsonNavigateCommand.create(
090                            TreePath.fromJson(jsonPath), new PassThroughRule()))))
091        .onErrorReturn(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build());
092  }
093
094  /**
095   * Process data stream with custom command pipeline.
096   *
097   * @param inputData binary input data
098   * @param pipelineConfig pipeline configuration header
099   * @return processed data as binary stream
100   */
101  @PostMapping(
102      value = "/process",
103      consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE,
104      produces = MediaType.APPLICATION_OCTET_STREAM_VALUE)
105  public Mono<ResponseEntity<Flux<DataBuffer>>> processWithPipeline(
106      @RequestBody Flux<DataBuffer> inputData,
107      @RequestHeader("X-Pipeline-Config") String pipelineConfig) {
108
109    log.info("Processing with pipeline config: {}", pipelineConfig);
110
111    return Mono
112        .fromCallable(
113            () ->
114                ResponseEntity.ok(
115                    processWithStreamConverter(
116                        inputData, buildPipelineFromConfig(pipelineConfig))))
117        .onErrorReturn(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build());
118  }
119
120  /**
121   * Health check endpoint.
122   *
123   * @return simple health status
124   */
125  @GetMapping("/health")
126  public Mono<ResponseEntity<String>> health() {
127    return Mono.just(ResponseEntity.ok("StreamConverter Web API is running"));
128  }
129
130  /**
131   * Processes data using StreamConverter with given commands in a streaming fashion.
132   *
133   * @param inputData reactive stream of input data buffers
134   * @param commands stream commands to execute
135   * @return processed output as Flux of DataBuffer
136   */
137  private Flux<DataBuffer> processWithStreamConverter(
138      Flux<DataBuffer> inputData, IStreamCommand... commands) {
139    DefaultDataBufferFactory bufferFactory = new DefaultDataBufferFactory();
140    ExecutorService executor = Executors.newSingleThreadExecutor();
141    return Flux
142        .from(
143            DataBufferUtils.outputStreamPublisher(
144                outputStream -> {
145                  try (InputStream inputStream =
146                      DataBufferUtils.subscriberInputStream(inputData, 4096)) {
147                    StreamConverter converter = StreamConverter.create(commands);
148                    converter.run(inputStream, outputStream);
149                  } catch (IOException e) {
150                    throw new RuntimeException("Stream processing failed", e);
151                  }
152                },
153                bufferFactory,
154                executor))
155        .doFinally(signalType -> executor.shutdown());
156  }
157
158  /**
159   * Builds a pipeline from configuration string.
160   *
161   * @param config pipeline configuration (e.g., "csv:name,json:$.result,process:validator")
162   * @return array of stream commands
163   */
164  private IStreamCommand[] buildPipelineFromConfig(String config) {
165    String[] commandConfigs = config.split(",");
166    IStreamCommand[] commands = new IStreamCommand[commandConfigs.length];
167
168    for (int i = 0; i < commandConfigs.length; i++) {
169      String[] parts = commandConfigs[i].split(":", 2);
170      String commandType = parts[0].trim();
171      String parameter = parts.length > 1 ? parts[1].trim() : "";
172
173      commands[i] =
174          switch (commandType.toLowerCase()) {
175            case "csv" -> CsvNavigateCommand.create(new CSVPath(parameter), new PassThroughRule());
176            case "json" -> JsonNavigateCommand.create(TreePath.fromJson(parameter), new PassThroughRule());
177            case "process" -> new SampleStreamCommand(parameter);
178            default -> throw new IllegalArgumentException("Unknown command type: " + commandType);
179          };
180    }
181
182    log.info("Built pipeline with {} commands", commands.length);
183    return commands;
184  }
185}