001/**
002 * Copyright (c) 2023, Stream Converter Project All rights reserved.
003 * 指定された通信先にOutputStreamを送信するコマンドクラス。
004 *
005 * <p>このクラスは、指定された通信先にOutputStreamを送信するためのコマンドを実装します。 ストリームを使用して、データを送信します。 送信先のURLはコンストラクタで指定されます。
006 * 送信先のURLは、HTTP POSTリクエストを使用してデータを送信します。 送信先のURLは、HTTPまたはHTTPSで始まる必要があります。
007 * 送信先のURLは、コンストラクタで指定されたURLに基づいて決定されます。
008 *
009 * <p>このクラスは、ストリーム変換のコマンドを実装するための抽象クラスを拡張しています。 ストリーム変換のコマンドは、ストリームを使用してデータを変換するためのものです。
010 *
011 * <p>レスポンスを受信してOutputStreamに書き込むことができます。
012 */
013package com.streamconverter.command.impl;
014
015import com.google.common.net.InetAddresses;
016import com.streamconverter.command.AbstractStreamCommand;
017import java.io.IOException;
018import java.io.InputStream;
019import java.io.OutputStream;
020import java.net.InetAddress;
021import java.net.URI;
022import java.net.URISyntaxException;
023import java.time.Duration;
024import java.util.Objects;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027import org.springframework.http.HttpHeaders;
028import org.springframework.http.MediaType;
029import org.springframework.http.client.reactive.ReactorClientHttpConnector;
030import org.springframework.web.reactive.function.BodyInserters;
031import org.springframework.web.reactive.function.client.WebClient;
032import reactor.netty.http.client.HttpClient;
033
034/** 指定された通信先にOutputStreamを送信するコマンドクラス。 */
035public class SendHttpCommand extends AbstractStreamCommand {
036
037  private static final Logger logger = LoggerFactory.getLogger(SendHttpCommand.class);
038
039  private final String url;
040  private final WebClient webClient;
041
042  /**
043   * デフォルトコンストラクタ
044   *
045   * @param url 送信先のURL
046   * @throws IllegalArgumentException URLが無効な場合
047   */
048  public SendHttpCommand(String url) {
049    this(url, createDefaultWebClient());
050  }
051
052  /**
053   * カスタム {@link WebClient} を使用するコンストラクタ。
054   *
055   * <p>主にテストや特殊なHTTPクライアント設定のために使用する。URLの検証は通常コンストラクタと同様に適用する。
056   *
057   * @param url 送信先のURL
058   * @param webClient 使用するWebClient
059   */
060  public SendHttpCommand(String url, WebClient webClient) {
061    super();
062    this.url = validateAndSanitizeUrl(url);
063    this.webClient = Objects.requireNonNull(webClient, "webClient must not be null");
064  }
065
066  private static WebClient createDefaultWebClient() {
067    // Simple HttpClient configuration for Netty 4.1.123.Final compatibility
068    HttpClient httpClient =
069        HttpClient.create()
070            .responseTimeout(Duration.ofSeconds(30))
071            .option(io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
072            .keepAlive(false); // Disable keep-alive to avoid connection pool issues
073
074    return WebClient.builder()
075        .clientConnector(new ReactorClientHttpConnector(httpClient))
076        .codecs(
077            configurer ->
078                // 1 MB limit for error response bodies (used by onStatus bodyToMono).
079                // The streaming response path (bodyToFlux) bypasses this buffer entirely.
080                configurer.defaultCodecs().maxInMemorySize(1024 * 1024))
081        .build();
082  }
083
084  /**
085   * URLの検証とサニタイゼーションを行う
086   *
087   * @param url 検証するURL
088   * @return 検証済みURL
089   * @throws IllegalArgumentException URLが無効な場合
090   */
091  private String validateAndSanitizeUrl(String url) {
092    Objects.requireNonNull(url, "URL cannot be null");
093
094    String trimmedUrl = url.trim();
095    if (trimmedUrl.isEmpty()) {
096      throw new IllegalArgumentException("URL cannot be empty");
097    }
098
099    try {
100      URI uri = new URI(trimmedUrl);
101      String scheme = uri.getScheme();
102
103      if (scheme == null) {
104        throw new IllegalArgumentException("URL must have a scheme (http or https)");
105      }
106
107      if (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme)) {
108        throw new IllegalArgumentException("Only HTTP and HTTPS protocols are allowed");
109      }
110
111      String host = uri.getHost();
112      if (host == null || host.trim().isEmpty()) {
113        throw new IllegalArgumentException("URL must have a valid host");
114      }
115
116      // ローカルホストや内部IPアドレスへのアクセスを防ぐ
117      if (isLocalhost(host) || isPrivateIpAddress(host)) {
118        throw new IllegalArgumentException(
119            "Access to localhost or private IP addresses is not allowed");
120      }
121
122      return trimmedUrl;
123    } catch (URISyntaxException e) {
124      throw new IllegalArgumentException("Invalid URL format: " + e.getMessage(), e);
125    }
126  }
127
128  /** ローカルホストかどうかを判定する */
129  private boolean isLocalhost(String host) {
130    if (host == null) {
131      return false;
132    }
133
134    // Handle IPv6 addresses with brackets
135    String cleanHost =
136        host.startsWith("[") && host.endsWith("]") ? host.substring(1, host.length() - 1) : host;
137
138    return "localhost".equalsIgnoreCase(cleanHost)
139        || "127.0.0.1".equals(cleanHost)
140        || "::1".equals(cleanHost);
141  }
142
143  /** プライベートIPアドレスかどうかを判定する(Guava使用) */
144  private boolean isPrivateIpAddress(String host) {
145    try {
146      // GuavaのInetAddressesを使用してIPアドレスを解析
147      InetAddress address = InetAddresses.forString(host);
148      // RFC 1918準拠のプライベートアドレス判定
149      return address.isSiteLocalAddress() || address.isLoopbackAddress();
150    } catch (IllegalArgumentException e) {
151      // IPアドレス形式でない場合(ホスト名など)はfalseを返す
152      return false;
153    }
154  }
155
156  /**
157   * ストリームを指定されたURLに送信します。
158   *
159   * @param inputStream 入力ストリーム
160   * @param outputStream 出力ストリーム
161   * @throws IOException 入出力エラーが発生した場合
162   */
163  @Override
164  public void execute(InputStream inputStream, OutputStream outputStream)
165      throws IOException {
166    Objects.requireNonNull(inputStream, "inputStream must not be null");
167    Objects.requireNonNull(outputStream, "outputStream must not be null");
168
169    logger.info("Sending HTTP POST request to: {}", url);
170
171    try {
172      // Track total bytes written for better error reporting
173      final long[] totalBytesWritten = {0L};
174
175      // 大容量データに対応するため、ストリーミング処理を使用
176      // WebClientでストリーミングレスポンスを処理
177      webClient
178          .post()
179          .uri(url)
180          .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_OCTET_STREAM_VALUE)
181          .header(HttpHeaders.USER_AGENT, "StreamConverter/1.0")
182          .body(
183              BodyInserters.fromDataBuffers(
184                  org.springframework.core.io.buffer.DataBufferUtils.readInputStream(
185                      () -> inputStream,
186                      org.springframework.core.io.buffer.DefaultDataBufferFactory.sharedInstance,
187                      8192))) // 8KB chunks for memory efficiency
188          .retrieve()
189          .onStatus(
190              status -> !status.is2xxSuccessful(),
191              response ->
192                  response
193                      .bodyToMono(String.class)
194                      .defaultIfEmpty("")
195                      .map(
196                          errorBody ->
197                              new RuntimeException(
198                                  String.format(
199                                      "HTTP request failed: status=%d, url=%s, response=%s",
200                                      response.statusCode().value(), url, errorBody))))
201          .bodyToFlux(org.springframework.core.io.buffer.DataBuffer.class)
202          .timeout(Duration.ofMinutes(5)) // 大容量データ処理のため5分に延長
203          .doOnNext(
204              dataBuffer -> {
205                // Ensure DataBuffer is always released, even if write fails
206                try {
207                  // ストリーミング処理:8KBずつレスポンスを処理
208                  byte[] bytes = new byte[dataBuffer.readableByteCount()];
209                  dataBuffer.read(bytes);
210                  outputStream.write(bytes);
211                  totalBytesWritten[0] += bytes.length;
212                } catch (IOException e) {
213                  throw new RuntimeException(
214                      String.format(
215                          "Failed to write response data to output stream (url=%s, bytesWritten=%d)",
216                          url, totalBytesWritten[0]),
217                      e);
218                } finally {
219                  org.springframework.core.io.buffer.DataBufferUtils.release(dataBuffer);
220                }
221              })
222          .doOnComplete(
223              () -> {
224                try {
225                  outputStream.flush();
226                  logger.info("HTTP response streaming completed successfully");
227                } catch (IOException e) {
228                  throw new RuntimeException("Failed to flush output stream", e);
229                }
230              })
231          .blockLast(); // Intentionally synchronous: AbstractStreamCommand interface requires
232      // blocking execution
233      // for compatibility with existing command pipeline. Alternative: use subscribe()
234      // with CompletableFuture for true async, but would break command interface contract.
235
236    } catch (RuntimeException e) {
237      // WebClient error responses are wrapped in RuntimeException
238      String errorMessage = "HTTP request failed: " + url + " - " + e.getMessage();
239      logger.error(errorMessage, e);
240      throw new IOException(errorMessage, e);
241    }
242  }
243}