001package com.streamconverter.web; 002 003import java.io.IOException; 004import java.io.InputStream; 005import java.util.concurrent.ExecutorService; 006import java.util.concurrent.Executors; 007import org.slf4j.Logger; 008import org.slf4j.LoggerFactory; 009import org.springframework.core.io.buffer.DataBuffer; 010import org.springframework.core.io.buffer.DefaultDataBufferFactory; 011import org.springframework.core.io.buffer.DataBufferUtils; 012import org.springframework.http.HttpStatus; 013import org.springframework.http.MediaType; 014import org.springframework.http.ResponseEntity; 015import org.springframework.web.bind.annotation.*; 016 017import com.streamconverter.StreamConverter; 018import com.streamconverter.command.IStreamCommand; 019import com.streamconverter.command.impl.SampleStreamCommand; 020import com.streamconverter.command.impl.csv.CsvNavigateCommand; 021import com.streamconverter.command.impl.json.JsonNavigateCommand; 022import com.streamconverter.command.rule.PassThroughRule; 023import com.streamconverter.path.CSVPath; 024import com.streamconverter.path.TreePath; 025 026import reactor.core.publisher.Flux; 027import reactor.core.publisher.Mono; 028 029/** 030 * Web API controller for StreamConverter processing. 031 * 032 * <p>Provides REST endpoints to process data streams using existing StreamConverter functionality. 033 */ 034@RestController 035@RequestMapping("/api/v1/stream") 036public class StreamProcessingController { 037 038 private static final Logger log = LoggerFactory.getLogger(StreamProcessingController.class); 039 040 /** 041 * Process data stream with CSV extraction. 042 * 043 * @param inputData binary input data 044 * @param columnName CSV column name to extract 045 * @return processed data as binary stream 046 */ 047 @PostMapping( 048 value = "/csv/extract", 049 consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE, 050 produces = MediaType.APPLICATION_OCTET_STREAM_VALUE) 051 public Mono<ResponseEntity<Flux<DataBuffer>>> processCsvExtraction( 052 @RequestBody Flux<DataBuffer> inputData, @RequestParam String columnName) { 053 054 log.info("Processing CSV extraction for column: {}", columnName); 055 056 return Mono 057 .fromCallable( 058 () -> 059 ResponseEntity.ok( 060 processWithStreamConverter( 061 inputData, 062 CsvNavigateCommand.create( 063 new CSVPath(columnName), new PassThroughRule())))) 064 .onErrorReturn(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build()); 065 } 066 067 /** 068 * Process data stream with JSON path extraction. 069 * 070 * @param inputData binary input data 071 * @param jsonPath JSON path expression 072 * @return processed data as binary stream 073 */ 074 @PostMapping( 075 value = "/json/extract", 076 consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE, 077 produces = MediaType.APPLICATION_OCTET_STREAM_VALUE) 078 public Mono<ResponseEntity<Flux<DataBuffer>>> processJsonExtraction( 079 @RequestBody Flux<DataBuffer> inputData, @RequestParam String jsonPath) { 080 081 log.info("Processing JSON extraction for path: {}", jsonPath); 082 083 return Mono 084 .fromCallable( 085 () -> 086 ResponseEntity.ok( 087 processWithStreamConverter( 088 inputData, 089 JsonNavigateCommand.create( 090 TreePath.fromJson(jsonPath), new PassThroughRule())))) 091 .onErrorReturn(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build()); 092 } 093 094 /** 095 * Process data stream with custom command pipeline. 096 * 097 * @param inputData binary input data 098 * @param pipelineConfig pipeline configuration header 099 * @return processed data as binary stream 100 */ 101 @PostMapping( 102 value = "/process", 103 consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE, 104 produces = MediaType.APPLICATION_OCTET_STREAM_VALUE) 105 public Mono<ResponseEntity<Flux<DataBuffer>>> processWithPipeline( 106 @RequestBody Flux<DataBuffer> inputData, 107 @RequestHeader("X-Pipeline-Config") String pipelineConfig) { 108 109 log.info("Processing with pipeline config: {}", pipelineConfig); 110 111 return Mono 112 .fromCallable( 113 () -> 114 ResponseEntity.ok( 115 processWithStreamConverter( 116 inputData, buildPipelineFromConfig(pipelineConfig)))) 117 .onErrorReturn(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build()); 118 } 119 120 /** 121 * Health check endpoint. 122 * 123 * @return simple health status 124 */ 125 @GetMapping("/health") 126 public Mono<ResponseEntity<String>> health() { 127 return Mono.just(ResponseEntity.ok("StreamConverter Web API is running")); 128 } 129 130 /** 131 * Processes data using StreamConverter with given commands in a streaming fashion. 132 * 133 * @param inputData reactive stream of input data buffers 134 * @param commands stream commands to execute 135 * @return processed output as Flux of DataBuffer 136 */ 137 private Flux<DataBuffer> processWithStreamConverter( 138 Flux<DataBuffer> inputData, IStreamCommand... commands) { 139 DefaultDataBufferFactory bufferFactory = new DefaultDataBufferFactory(); 140 ExecutorService executor = Executors.newSingleThreadExecutor(); 141 return Flux 142 .from( 143 DataBufferUtils.outputStreamPublisher( 144 outputStream -> { 145 try (InputStream inputStream = 146 DataBufferUtils.subscriberInputStream(inputData, 4096)) { 147 StreamConverter converter = StreamConverter.create(commands); 148 converter.run(inputStream, outputStream); 149 } catch (IOException e) { 150 throw new RuntimeException("Stream processing failed", e); 151 } 152 }, 153 bufferFactory, 154 executor)) 155 .doFinally(signalType -> executor.shutdown()); 156 } 157 158 /** 159 * Builds a pipeline from configuration string. 160 * 161 * @param config pipeline configuration (e.g., "csv:name,json:$.result,process:validator") 162 * @return array of stream commands 163 */ 164 private IStreamCommand[] buildPipelineFromConfig(String config) { 165 String[] commandConfigs = config.split(","); 166 IStreamCommand[] commands = new IStreamCommand[commandConfigs.length]; 167 168 for (int i = 0; i < commandConfigs.length; i++) { 169 String[] parts = commandConfigs[i].split(":", 2); 170 String commandType = parts[0].trim(); 171 String parameter = parts.length > 1 ? parts[1].trim() : ""; 172 173 commands[i] = 174 switch (commandType.toLowerCase()) { 175 case "csv" -> CsvNavigateCommand.create(new CSVPath(parameter), new PassThroughRule()); 176 case "json" -> JsonNavigateCommand.create(TreePath.fromJson(parameter), new PassThroughRule()); 177 case "process" -> new SampleStreamCommand(parameter); 178 default -> throw new IllegalArgumentException("Unknown command type: " + commandType); 179 }; 180 } 181 182 log.info("Built pipeline with {} commands", commands.length); 183 return commands; 184 } 185}