001package com.streamconverter.web; 002 003import java.io.IOException; 004import java.io.InputStream; 005import java.util.Locale; 006import java.util.concurrent.ExecutorService; 007import java.util.concurrent.Executors; 008import org.slf4j.Logger; 009import org.slf4j.LoggerFactory; 010import org.springframework.core.io.buffer.DataBuffer; 011import org.springframework.core.io.buffer.DefaultDataBufferFactory; 012import org.springframework.core.io.buffer.DataBufferUtils; 013import org.springframework.http.HttpStatus; 014import org.springframework.http.MediaType; 015import org.springframework.http.ResponseEntity; 016import org.springframework.web.bind.annotation.*; 017 018import com.streamconverter.StreamConverter; 019import com.streamconverter.command.IStreamCommand; 020import com.streamconverter.command.impl.csv.CsvNavigateCommand; 021import com.streamconverter.command.impl.json.JsonNavigateCommand; 022import com.streamconverter.command.rule.PassThroughRule; 023import com.streamconverter.path.CSVPath; 024import com.streamconverter.path.TreePath; 025 026import reactor.core.publisher.Flux; 027import reactor.core.publisher.Mono; 028 029/** 030 * Web API controller for StreamConverter processing. 031 * 032 * <p>Provides REST endpoints to process data streams using existing StreamConverter functionality. 033 */ 034@RestController 035@RequestMapping("/api/v1/stream") 036public class StreamProcessingController { 037 038 private static final Logger log = LoggerFactory.getLogger(StreamProcessingController.class); 039 040 private static final int MAX_PIPELINE_CONFIG_LENGTH = 1000; 041 private static final int MAX_PIPELINE_COMMANDS = 10; 042 private static final int MAX_PARAMETER_LENGTH = 500; 043 044 /** 045 * Process data stream with CSV extraction. 046 * 047 * @param inputData binary input data 048 * @param columnName CSV column name to extract 049 * @return processed data as binary stream 050 */ 051 @PostMapping( 052 value = "/csv/extract", 053 consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE, 054 produces = MediaType.APPLICATION_OCTET_STREAM_VALUE) 055 public Mono<ResponseEntity<Flux<DataBuffer>>> processCsvExtraction( 056 @RequestBody Flux<DataBuffer> inputData, @RequestParam String columnName) { 057 058 log.info("Processing CSV extraction for column: {}", columnName); 059 060 return Mono 061 .fromCallable( 062 () -> 063 ResponseEntity.ok( 064 processWithStreamConverter( 065 inputData, 066 CsvNavigateCommand.create( 067 CSVPath.of(columnName), new PassThroughRule())))) 068 .onErrorResume( 069 e -> { 070 log.error("CSV extraction failed: {}", e.getMessage(), e); 071 return Mono.just(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build()); 072 }); 073 } 074 075 /** 076 * Process data stream with JSON path extraction. 077 * 078 * @param inputData binary input data 079 * @param jsonPath JSON path expression 080 * @return processed data as binary stream 081 */ 082 @PostMapping( 083 value = "/json/extract", 084 consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE, 085 produces = MediaType.APPLICATION_OCTET_STREAM_VALUE) 086 public Mono<ResponseEntity<Flux<DataBuffer>>> processJsonExtraction( 087 @RequestBody Flux<DataBuffer> inputData, @RequestParam String jsonPath) { 088 089 log.info("Processing JSON extraction for path: {}", jsonPath); 090 091 return Mono 092 .fromCallable( 093 () -> 094 ResponseEntity.ok( 095 processWithStreamConverter( 096 inputData, 097 JsonNavigateCommand.create( 098 TreePath.fromJson(jsonPath), new PassThroughRule())))) 099 .onErrorResume( 100 e -> { 101 log.error("JSON extraction failed: {}", e.getMessage(), e); 102 return Mono.just(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build()); 103 }); 104 } 105 106 /** 107 * Process data stream with custom command pipeline. 108 * 109 * @param inputData binary input data 110 * @param pipelineConfig pipeline configuration header 111 * @return processed data as binary stream 112 */ 113 @PostMapping( 114 value = "/process", 115 consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE, 116 produces = MediaType.APPLICATION_OCTET_STREAM_VALUE) 117 public Mono<ResponseEntity<Flux<DataBuffer>>> processWithPipeline( 118 @RequestBody Flux<DataBuffer> inputData, 119 @RequestHeader("X-Pipeline-Config") String pipelineConfig) { 120 121 return Mono 122 .fromCallable(() -> buildPipelineFromConfig(pipelineConfig)) 123 .map( 124 commands -> { 125 log.info("Processing pipeline with {} commands", commands.length); 126 return ResponseEntity.ok(processWithStreamConverter(inputData, commands)); 127 }) 128 .onErrorResume( 129 IllegalArgumentException.class, 130 e -> { 131 log.warn("Invalid pipeline config: {}", e.getMessage()); 132 return Mono.just(ResponseEntity.status(HttpStatus.BAD_REQUEST).build()); 133 }) 134 .onErrorResume( 135 e -> { 136 log.error("Pipeline processing failed: {}", e.getMessage(), e); 137 return Mono.just(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build()); 138 }); 139 } 140 141 /** 142 * Health check endpoint. 143 * 144 * @return simple health status 145 */ 146 @GetMapping("/health") 147 public Mono<ResponseEntity<String>> health() { 148 return Mono.just(ResponseEntity.ok("StreamConverter Web API is running")); 149 } 150 151 /** 152 * Processes data using StreamConverter with given commands in a streaming fashion. 153 * 154 * @param inputData reactive stream of input data buffers 155 * @param commands stream commands to execute 156 * @return processed output as Flux of DataBuffer 157 */ 158 private Flux<DataBuffer> processWithStreamConverter( 159 Flux<DataBuffer> inputData, IStreamCommand... commands) { 160 DefaultDataBufferFactory bufferFactory = new DefaultDataBufferFactory(); 161 ExecutorService executor = Executors.newSingleThreadExecutor(); 162 return Flux 163 .from( 164 DataBufferUtils.outputStreamPublisher( 165 outputStream -> { 166 try (InputStream inputStream = 167 DataBufferUtils.subscriberInputStream(inputData, 4096)) { 168 StreamConverter converter = StreamConverter.create(commands); 169 converter.run(inputStream, outputStream); 170 } catch (IOException e) { 171 throw new RuntimeException("Stream processing failed", e); 172 } 173 }, 174 bufferFactory, 175 executor)) 176 .doFinally(signalType -> executor.shutdown()); 177 } 178 179 /** 180 * Builds a pipeline from configuration string. 181 * 182 * @param config pipeline configuration (e.g., "csv:name,json:$.result,process:validator") 183 * @return array of stream commands 184 * @throws IllegalArgumentException config が null/空/長すぎる、またはコマンド数・パラメータが不正な場合 185 */ 186 private IStreamCommand[] buildPipelineFromConfig(String config) { 187 if (config == null || config.isBlank()) { 188 throw new IllegalArgumentException("Pipeline config must not be null or blank"); 189 } 190 if (config.length() > MAX_PIPELINE_CONFIG_LENGTH) { 191 throw new IllegalArgumentException( 192 "Pipeline config exceeds maximum length of " + MAX_PIPELINE_CONFIG_LENGTH); 193 } 194 195 String[] commandConfigs = config.split(",", -1); 196 if (commandConfigs.length > MAX_PIPELINE_COMMANDS) { 197 throw new IllegalArgumentException( 198 "Pipeline config exceeds maximum command count of " + MAX_PIPELINE_COMMANDS); 199 } 200 201 IStreamCommand[] commands = new IStreamCommand[commandConfigs.length]; 202 203 for (int i = 0; i < commandConfigs.length; i++) { 204 String[] parts = commandConfigs[i].split(":", 2); 205 String commandType = parts[0].trim(); 206 if (commandType.isEmpty()) { 207 throw new IllegalArgumentException("Command type must not be empty at index " + i); 208 } 209 String parameter = parts.length > 1 ? parts[1].trim() : ""; 210 if (parameter.length() > MAX_PARAMETER_LENGTH) { 211 throw new IllegalArgumentException( 212 "Parameter exceeds maximum length of " + MAX_PARAMETER_LENGTH + " at index " + i); 213 } 214 215 commands[i] = 216 switch (commandType.toLowerCase(Locale.ROOT)) { 217 case "csv" -> { 218 if (parameter.isEmpty()) { 219 throw new IllegalArgumentException("csv command requires a column name at index " + i); 220 } 221 yield CsvNavigateCommand.create(CSVPath.of(parameter), new PassThroughRule()); 222 } 223 case "json" -> { 224 if (parameter.isEmpty()) { 225 throw new IllegalArgumentException("json command requires a path at index " + i); 226 } 227 yield JsonNavigateCommand.create(TreePath.fromJson(parameter), new PassThroughRule()); 228 } 229 case "process" -> (IStreamCommand) (in, out) -> in.transferTo(out); 230 default -> throw new IllegalArgumentException("Unknown command type: " + commandType); 231 }; 232 } 233 234 log.info("Built pipeline with {} commands", commands.length); 235 return commands; 236 } 237}