001/** 002 * Copyright (c) 2023, Stream Converter Project All rights reserved. 003 * 指定された通信先にOutputStreamを送信するコマンドクラス。 004 * 005 * <p>このクラスは、指定された通信先にOutputStreamを送信するためのコマンドを実装します。 ストリームを使用して、データを送信します。 送信先のURLはコンストラクタで指定されます。 006 * 送信先のURLは、HTTP POSTリクエストを使用してデータを送信します。 送信先のURLは、HTTPまたはHTTPSで始まる必要があります。 007 * 送信先のURLは、コンストラクタで指定されたURLに基づいて決定されます。 008 * 009 * <p>このクラスは、ストリーム変換のコマンドを実装するための抽象クラスを拡張しています。 ストリーム変換のコマンドは、ストリームを使用してデータを変換するためのものです。 010 * 011 * <p>レスポンスを受信してOutputStreamに書き込むことができます。 012 */ 013package com.streamconverter.command.impl; 014 015import com.google.common.net.InetAddresses; 016import com.streamconverter.command.AbstractStreamCommand; 017import java.io.IOException; 018import java.io.InputStream; 019import java.io.OutputStream; 020import java.net.InetAddress; 021import java.net.URI; 022import java.net.URISyntaxException; 023import java.time.Duration; 024import java.util.Objects; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027import org.springframework.http.HttpHeaders; 028import org.springframework.http.MediaType; 029import org.springframework.http.client.reactive.ReactorClientHttpConnector; 030import org.springframework.web.reactive.function.BodyInserters; 031import org.springframework.web.reactive.function.client.WebClient; 032import reactor.netty.http.client.HttpClient; 033 034/** 指定された通信先にOutputStreamを送信するコマンドクラス。 */ 035public class SendHttpCommand extends AbstractStreamCommand { 036 037 private static final Logger logger = LoggerFactory.getLogger(SendHttpCommand.class); 038 039 private final String url; 040 private final WebClient webClient; 041 042 /** 043 * デフォルトコンストラクタ 044 * 045 * @param url 送信先のURL 046 * @throws IllegalArgumentException URLが無効な場合 047 */ 048 public SendHttpCommand(String url) { 049 this(url, createDefaultWebClient()); 050 } 051 052 /** 053 * カスタム {@link WebClient} を使用するコンストラクタ。 054 * 055 * <p>主にテストや特殊なHTTPクライアント設定のために使用する。URLの検証は通常コンストラクタと同様に適用する。 056 * 057 * @param url 送信先のURL 058 * @param webClient 使用するWebClient 059 */ 060 public SendHttpCommand(String url, WebClient webClient) { 061 super(); 062 this.url = validateAndSanitizeUrl(url); 063 this.webClient = Objects.requireNonNull(webClient, "webClient must not be null"); 064 } 065 066 private static WebClient createDefaultWebClient() { 067 // Simple HttpClient configuration for Netty 4.1.123.Final compatibility 068 HttpClient httpClient = 069 HttpClient.create() 070 .responseTimeout(Duration.ofSeconds(30)) 071 .option(io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) 072 .keepAlive(false); // Disable keep-alive to avoid connection pool issues 073 074 return WebClient.builder() 075 .clientConnector(new ReactorClientHttpConnector(httpClient)) 076 .codecs( 077 configurer -> 078 // 1 MB limit for error response bodies (used by onStatus bodyToMono). 079 // The streaming response path (bodyToFlux) bypasses this buffer entirely. 080 configurer.defaultCodecs().maxInMemorySize(1024 * 1024)) 081 .build(); 082 } 083 084 /** 085 * URLの検証とサニタイゼーションを行う 086 * 087 * @param url 検証するURL 088 * @return 検証済みURL 089 * @throws IllegalArgumentException URLが無効な場合 090 */ 091 private String validateAndSanitizeUrl(String url) { 092 Objects.requireNonNull(url, "URL cannot be null"); 093 094 String trimmedUrl = url.trim(); 095 if (trimmedUrl.isEmpty()) { 096 throw new IllegalArgumentException("URL cannot be empty"); 097 } 098 099 try { 100 URI uri = new URI(trimmedUrl); 101 String scheme = uri.getScheme(); 102 103 if (scheme == null) { 104 throw new IllegalArgumentException("URL must have a scheme (http or https)"); 105 } 106 107 if (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme)) { 108 throw new IllegalArgumentException("Only HTTP and HTTPS protocols are allowed"); 109 } 110 111 String host = uri.getHost(); 112 if (host == null || host.trim().isEmpty()) { 113 throw new IllegalArgumentException("URL must have a valid host"); 114 } 115 116 // ローカルホストや内部IPアドレスへのアクセスを防ぐ 117 if (isLocalhost(host) || isPrivateIpAddress(host)) { 118 throw new IllegalArgumentException( 119 "Access to localhost or private IP addresses is not allowed"); 120 } 121 122 return trimmedUrl; 123 } catch (URISyntaxException e) { 124 throw new IllegalArgumentException("Invalid URL format: " + e.getMessage(), e); 125 } 126 } 127 128 /** ローカルホストかどうかを判定する */ 129 private boolean isLocalhost(String host) { 130 if (host == null) { 131 return false; 132 } 133 134 // Handle IPv6 addresses with brackets 135 String cleanHost = 136 host.startsWith("[") && host.endsWith("]") ? host.substring(1, host.length() - 1) : host; 137 138 return "localhost".equalsIgnoreCase(cleanHost) 139 || "127.0.0.1".equals(cleanHost) 140 || "::1".equals(cleanHost); 141 } 142 143 /** プライベートIPアドレスかどうかを判定する(Guava使用) */ 144 private boolean isPrivateIpAddress(String host) { 145 try { 146 // GuavaのInetAddressesを使用してIPアドレスを解析 147 InetAddress address = InetAddresses.forString(host); 148 // RFC 1918準拠のプライベートアドレス判定 149 return address.isSiteLocalAddress() || address.isLoopbackAddress(); 150 } catch (IllegalArgumentException e) { 151 // IPアドレス形式でない場合(ホスト名など)はfalseを返す 152 return false; 153 } 154 } 155 156 /** 157 * ストリームを指定されたURLに送信します。 158 * 159 * @param inputStream 入力ストリーム 160 * @param outputStream 出力ストリーム 161 * @throws IOException 入出力エラーが発生した場合 162 */ 163 @Override 164 public void execute(InputStream inputStream, OutputStream outputStream) 165 throws IOException { 166 Objects.requireNonNull(inputStream, "inputStream must not be null"); 167 Objects.requireNonNull(outputStream, "outputStream must not be null"); 168 169 logger.info("Sending HTTP POST request to: {}", url); 170 171 try { 172 // Track total bytes written for better error reporting 173 final long[] totalBytesWritten = {0L}; 174 175 // 大容量データに対応するため、ストリーミング処理を使用 176 // WebClientでストリーミングレスポンスを処理 177 webClient 178 .post() 179 .uri(url) 180 .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_OCTET_STREAM_VALUE) 181 .header(HttpHeaders.USER_AGENT, "StreamConverter/1.0") 182 .body( 183 BodyInserters.fromDataBuffers( 184 org.springframework.core.io.buffer.DataBufferUtils.readInputStream( 185 () -> inputStream, 186 org.springframework.core.io.buffer.DefaultDataBufferFactory.sharedInstance, 187 8192))) // 8KB chunks for memory efficiency 188 .retrieve() 189 .onStatus( 190 status -> !status.is2xxSuccessful(), 191 response -> 192 response 193 .bodyToMono(String.class) 194 .defaultIfEmpty("") 195 .map( 196 errorBody -> 197 new RuntimeException( 198 String.format( 199 "HTTP request failed: status=%d, url=%s, response=%s", 200 response.statusCode().value(), url, errorBody)))) 201 .bodyToFlux(org.springframework.core.io.buffer.DataBuffer.class) 202 .timeout(Duration.ofMinutes(5)) // 大容量データ処理のため5分に延長 203 .doOnNext( 204 dataBuffer -> { 205 // Ensure DataBuffer is always released, even if write fails 206 try { 207 // ストリーミング処理:8KBずつレスポンスを処理 208 byte[] bytes = new byte[dataBuffer.readableByteCount()]; 209 dataBuffer.read(bytes); 210 outputStream.write(bytes); 211 totalBytesWritten[0] += bytes.length; 212 } catch (IOException e) { 213 throw new RuntimeException( 214 String.format( 215 "Failed to write response data to output stream (url=%s, bytesWritten=%d)", 216 url, totalBytesWritten[0]), 217 e); 218 } finally { 219 org.springframework.core.io.buffer.DataBufferUtils.release(dataBuffer); 220 } 221 }) 222 .doOnComplete( 223 () -> { 224 try { 225 outputStream.flush(); 226 logger.info("HTTP response streaming completed successfully"); 227 } catch (IOException e) { 228 throw new RuntimeException("Failed to flush output stream", e); 229 } 230 }) 231 .blockLast(); // Intentionally synchronous: AbstractStreamCommand interface requires 232 // blocking execution 233 // for compatibility with existing command pipeline. Alternative: use subscribe() 234 // with CompletableFuture for true async, but would break command interface contract. 235 236 } catch (RuntimeException e) { 237 // WebClient error responses are wrapped in RuntimeException 238 String errorMessage = "HTTP request failed: " + url + " - " + e.getMessage(); 239 logger.error(errorMessage, e); 240 throw new IOException(errorMessage, e); 241 } 242 } 243}