001/** 002 * Copyright (c) 2023, Stream Converter Project All rights reserved. 003 * 指定された通信先にOutputStreamを送信するコマンドクラス。 004 * 005 * <p>このクラスは、指定された通信先にOutputStreamを送信するためのコマンドを実装します。 ストリームを使用して、データを送信します。 送信先のURLはコンストラクタで指定されます。 006 * 送信先のURLは、HTTP POSTリクエストを使用してデータを送信します。 送信先のURLは、HTTPまたはHTTPSで始まる必要があります。 007 * 送信先のURLは、コンストラクタで指定されたURLに基づいて決定されます。 008 * 009 * <p>このクラスは、ストリーム変換のコマンドを実装するための抽象クラスを拡張しています。 ストリーム変換のコマンドは、ストリームを使用してデータを変換するためのものです。 010 * 011 * <p>レスポンスを受信してOutputStreamに書き込むことができます。 012 */ 013package com.streamconverter.command.impl; 014 015import com.google.common.net.InetAddresses; 016import com.streamconverter.command.AbstractStreamCommand; 017import java.io.IOException; 018import java.io.InputStream; 019import java.io.OutputStream; 020import java.net.InetAddress; 021import java.net.URI; 022import java.net.URISyntaxException; 023import java.time.Duration; 024import java.util.Objects; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027import org.springframework.http.HttpHeaders; 028import org.springframework.http.MediaType; 029import org.springframework.http.client.reactive.ReactorClientHttpConnector; 030import org.springframework.web.reactive.function.BodyInserters; 031import org.springframework.web.reactive.function.client.WebClient; 032import reactor.netty.http.client.HttpClient; 033 034/** 指定された通信先にOutputStreamを送信するコマンドクラス。 */ 035public class SendHttpCommand extends AbstractStreamCommand { 036 037 private static final Logger logger = LoggerFactory.getLogger(SendHttpCommand.class); 038 039 private String url; 040 private final WebClient webClient; 041 042 /** 043 * デフォルトコンストラクタ 044 * 045 * @param url 送信先のURL 046 * @throws IllegalArgumentException URLが無効な場合 047 */ 048 public SendHttpCommand(String url) { 049 super(); 050 this.url = validateAndSanitizeUrl(url); 051 052 // Simple HttpClient configuration for Netty 4.1.123.Final compatibility 053 HttpClient httpClient = 054 HttpClient.create() 055 .responseTimeout(Duration.ofSeconds(30)) 056 .option(io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) 057 .keepAlive(false); // Disable keep-alive to avoid connection pool issues 058 059 this.webClient = 060 WebClient.builder() 061 .clientConnector(new ReactorClientHttpConnector(httpClient)) 062 .codecs( 063 configurer -> 064 configurer.defaultCodecs().maxInMemorySize(-1)) // Unlimited for streaming 065 .build(); 066 } 067 068 /** 069 * URLの検証とサニタイゼーションを行う 070 * 071 * @param url 検証するURL 072 * @return 検証済みURL 073 * @throws IllegalArgumentException URLが無効な場合 074 */ 075 private String validateAndSanitizeUrl(String url) { 076 Objects.requireNonNull(url, "URL cannot be null"); 077 078 String trimmedUrl = url.trim(); 079 if (trimmedUrl.isEmpty()) { 080 throw new IllegalArgumentException("URL cannot be empty"); 081 } 082 083 try { 084 URI uri = new URI(trimmedUrl); 085 String scheme = uri.getScheme(); 086 087 if (scheme == null) { 088 throw new IllegalArgumentException("URL must have a scheme (http or https)"); 089 } 090 091 if (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme)) { 092 throw new IllegalArgumentException("Only HTTP and HTTPS protocols are allowed"); 093 } 094 095 String host = uri.getHost(); 096 if (host == null || host.trim().isEmpty()) { 097 throw new IllegalArgumentException("URL must have a valid host"); 098 } 099 100 // ローカルホストや内部IPアドレスへのアクセスを防ぐ 101 if (isLocalhost(host) || isPrivateIpAddress(host)) { 102 throw new IllegalArgumentException( 103 "Access to localhost or private IP addresses is not allowed"); 104 } 105 106 return trimmedUrl; 107 } catch (URISyntaxException e) { 108 throw new IllegalArgumentException("Invalid URL format: " + e.getMessage(), e); 109 } 110 } 111 112 /** ローカルホストかどうかを判定する */ 113 private boolean isLocalhost(String host) { 114 if (host == null) { 115 return false; 116 } 117 118 // Handle IPv6 addresses with brackets 119 String cleanHost = 120 host.startsWith("[") && host.endsWith("]") ? host.substring(1, host.length() - 1) : host; 121 122 return "localhost".equalsIgnoreCase(cleanHost) 123 || "127.0.0.1".equals(cleanHost) 124 || "::1".equals(cleanHost); 125 } 126 127 /** プライベートIPアドレスかどうかを判定する(Guava使用) */ 128 private boolean isPrivateIpAddress(String host) { 129 try { 130 // GuavaのInetAddressesを使用してIPアドレスを解析 131 InetAddress address = InetAddresses.forString(host); 132 // RFC 1918準拠のプライベートアドレス判定 133 return address.isSiteLocalAddress() || address.isLoopbackAddress(); 134 } catch (IllegalArgumentException e) { 135 // IPアドレス形式でない場合(ホスト名など)はfalseを返す 136 return false; 137 } 138 } 139 140 /** 141 * ストリームを指定されたURLに送信します。 142 * 143 * @param inputStream 入力ストリーム 144 * @param outputStream 出力ストリーム 145 * @throws IOException 入出力エラーが発生した場合 146 */ 147 @Override 148 protected void executeInternal(InputStream inputStream, OutputStream outputStream) 149 throws IOException { 150 Objects.requireNonNull(inputStream, "inputStream must not be null"); 151 Objects.requireNonNull(outputStream, "outputStream must not be null"); 152 153 logger.info("Sending HTTP POST request to: {}", url); 154 155 try { 156 // Track total bytes written for better error reporting 157 final long[] totalBytesWritten = {0L}; 158 159 // 大容量データに対応するため、ストリーミング処理を使用 160 // WebClientでストリーミングレスポンスを処理 161 webClient 162 .post() 163 .uri(url) 164 .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_OCTET_STREAM_VALUE) 165 .header(HttpHeaders.USER_AGENT, "StreamConverter/1.0") 166 .body( 167 BodyInserters.fromDataBuffers( 168 org.springframework.core.io.buffer.DataBufferUtils.readInputStream( 169 () -> inputStream, 170 org.springframework.core.io.buffer.DefaultDataBufferFactory.sharedInstance, 171 8192))) // 8KB chunks for memory efficiency 172 .retrieve() 173 .onStatus( 174 status -> !status.is2xxSuccessful(), 175 response -> 176 response 177 .bodyToMono(String.class) 178 .defaultIfEmpty("") 179 .map( 180 errorBody -> 181 new RuntimeException( 182 String.format( 183 "HTTP request failed: status=%d, url=%s, response=%s", 184 response.statusCode().value(), url, errorBody)))) 185 .bodyToFlux(org.springframework.core.io.buffer.DataBuffer.class) 186 .timeout(Duration.ofMinutes(5)) // 大容量データ処理のため5分に延長 187 .doOnNext( 188 dataBuffer -> { 189 // Ensure DataBuffer is always released, even if write fails 190 try { 191 // ストリーミング処理:8KBずつレスポンスを処理 192 byte[] bytes = new byte[dataBuffer.readableByteCount()]; 193 dataBuffer.read(bytes); 194 outputStream.write(bytes); 195 totalBytesWritten[0] += bytes.length; 196 } catch (IOException e) { 197 throw new RuntimeException( 198 String.format( 199 "Failed to write response data to output stream (url=%s, bytesWritten=%d)", 200 url, totalBytesWritten[0]), 201 e); 202 } finally { 203 org.springframework.core.io.buffer.DataBufferUtils.release(dataBuffer); 204 } 205 }) 206 .doOnComplete( 207 () -> { 208 try { 209 outputStream.flush(); 210 logger.info("HTTP response streaming completed successfully"); 211 } catch (IOException e) { 212 throw new RuntimeException("Failed to flush output stream", e); 213 } 214 }) 215 .blockLast(); // Intentionally synchronous: AbstractStreamCommand interface requires 216 // blocking execution 217 // for compatibility with existing command pipeline. Alternative: use subscribe() 218 // with CompletableFuture for true async, but would break command interface contract. 219 220 } catch (RuntimeException e) { 221 // WebClient error responses are wrapped in RuntimeException 222 String errorMessage = "HTTP request failed: " + url + " - " + e.getMessage(); 223 logger.error(errorMessage, e); 224 throw new IOException(errorMessage, e); 225 } catch (Exception e) { 226 String errorMessage = "HTTP request failed: " + url; 227 logger.error(errorMessage, e); 228 throw new IOException(errorMessage, e); 229 } 230 } 231}