001/**
002 * Copyright (c) 2023, Stream Converter Project All rights reserved.
003 * 指定された通信先にOutputStreamを送信するコマンドクラス。
004 *
005 * <p>このクラスは、指定された通信先にOutputStreamを送信するためのコマンドを実装します。 ストリームを使用して、データを送信します。 送信先のURLはコンストラクタで指定されます。
006 * 送信先のURLは、HTTP POSTリクエストを使用してデータを送信します。 送信先のURLは、HTTPまたはHTTPSで始まる必要があります。
007 * 送信先のURLは、コンストラクタで指定されたURLに基づいて決定されます。
008 *
009 * <p>このクラスは、ストリーム変換のコマンドを実装するための抽象クラスを拡張しています。 ストリーム変換のコマンドは、ストリームを使用してデータを変換するためのものです。
010 *
011 * <p>レスポンスを受信してOutputStreamに書き込むことができます。
012 */
013package com.streamconverter.command.impl;
014
015import com.google.common.net.InetAddresses;
016import com.streamconverter.command.AbstractStreamCommand;
017import java.io.IOException;
018import java.io.InputStream;
019import java.io.OutputStream;
020import java.net.InetAddress;
021import java.net.URI;
022import java.net.URISyntaxException;
023import java.time.Duration;
024import java.util.Objects;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027import org.springframework.http.HttpHeaders;
028import org.springframework.http.MediaType;
029import org.springframework.http.client.reactive.ReactorClientHttpConnector;
030import org.springframework.web.reactive.function.BodyInserters;
031import org.springframework.web.reactive.function.client.WebClient;
032import reactor.netty.http.client.HttpClient;
033
034/** 指定された通信先にOutputStreamを送信するコマンドクラス。 */
035public class SendHttpCommand extends AbstractStreamCommand {
036
037  private static final Logger logger = LoggerFactory.getLogger(SendHttpCommand.class);
038
039  private String url;
040  private final WebClient webClient;
041
042  /**
043   * デフォルトコンストラクタ
044   *
045   * @param url 送信先のURL
046   * @throws IllegalArgumentException URLが無効な場合
047   */
048  public SendHttpCommand(String url) {
049    super();
050    this.url = validateAndSanitizeUrl(url);
051
052    // Simple HttpClient configuration for Netty 4.1.123.Final compatibility
053    HttpClient httpClient =
054        HttpClient.create()
055            .responseTimeout(Duration.ofSeconds(30))
056            .option(io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
057            .keepAlive(false); // Disable keep-alive to avoid connection pool issues
058
059    this.webClient =
060        WebClient.builder()
061            .clientConnector(new ReactorClientHttpConnector(httpClient))
062            .codecs(
063                configurer ->
064                    configurer.defaultCodecs().maxInMemorySize(-1)) // Unlimited for streaming
065            .build();
066  }
067
068  /**
069   * URLの検証とサニタイゼーションを行う
070   *
071   * @param url 検証するURL
072   * @return 検証済みURL
073   * @throws IllegalArgumentException URLが無効な場合
074   */
075  private String validateAndSanitizeUrl(String url) {
076    Objects.requireNonNull(url, "URL cannot be null");
077
078    String trimmedUrl = url.trim();
079    if (trimmedUrl.isEmpty()) {
080      throw new IllegalArgumentException("URL cannot be empty");
081    }
082
083    try {
084      URI uri = new URI(trimmedUrl);
085      String scheme = uri.getScheme();
086
087      if (scheme == null) {
088        throw new IllegalArgumentException("URL must have a scheme (http or https)");
089      }
090
091      if (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme)) {
092        throw new IllegalArgumentException("Only HTTP and HTTPS protocols are allowed");
093      }
094
095      String host = uri.getHost();
096      if (host == null || host.trim().isEmpty()) {
097        throw new IllegalArgumentException("URL must have a valid host");
098      }
099
100      // ローカルホストや内部IPアドレスへのアクセスを防ぐ
101      if (isLocalhost(host) || isPrivateIpAddress(host)) {
102        throw new IllegalArgumentException(
103            "Access to localhost or private IP addresses is not allowed");
104      }
105
106      return trimmedUrl;
107    } catch (URISyntaxException e) {
108      throw new IllegalArgumentException("Invalid URL format: " + e.getMessage(), e);
109    }
110  }
111
112  /** ローカルホストかどうかを判定する */
113  private boolean isLocalhost(String host) {
114    if (host == null) {
115      return false;
116    }
117
118    // Handle IPv6 addresses with brackets
119    String cleanHost =
120        host.startsWith("[") && host.endsWith("]") ? host.substring(1, host.length() - 1) : host;
121
122    return "localhost".equalsIgnoreCase(cleanHost)
123        || "127.0.0.1".equals(cleanHost)
124        || "::1".equals(cleanHost);
125  }
126
127  /** プライベートIPアドレスかどうかを判定する(Guava使用) */
128  private boolean isPrivateIpAddress(String host) {
129    try {
130      // GuavaのInetAddressesを使用してIPアドレスを解析
131      InetAddress address = InetAddresses.forString(host);
132      // RFC 1918準拠のプライベートアドレス判定
133      return address.isSiteLocalAddress() || address.isLoopbackAddress();
134    } catch (IllegalArgumentException e) {
135      // IPアドレス形式でない場合(ホスト名など)はfalseを返す
136      return false;
137    }
138  }
139
140  /**
141   * ストリームを指定されたURLに送信します。
142   *
143   * @param inputStream 入力ストリーム
144   * @param outputStream 出力ストリーム
145   * @throws IOException 入出力エラーが発生した場合
146   */
147  @Override
148  protected void executeInternal(InputStream inputStream, OutputStream outputStream)
149      throws IOException {
150    Objects.requireNonNull(inputStream, "inputStream must not be null");
151    Objects.requireNonNull(outputStream, "outputStream must not be null");
152
153    logger.info("Sending HTTP POST request to: {}", url);
154
155    try {
156      // Track total bytes written for better error reporting
157      final long[] totalBytesWritten = {0L};
158
159      // 大容量データに対応するため、ストリーミング処理を使用
160      // WebClientでストリーミングレスポンスを処理
161      webClient
162          .post()
163          .uri(url)
164          .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_OCTET_STREAM_VALUE)
165          .header(HttpHeaders.USER_AGENT, "StreamConverter/1.0")
166          .body(
167              BodyInserters.fromDataBuffers(
168                  org.springframework.core.io.buffer.DataBufferUtils.readInputStream(
169                      () -> inputStream,
170                      org.springframework.core.io.buffer.DefaultDataBufferFactory.sharedInstance,
171                      8192))) // 8KB chunks for memory efficiency
172          .retrieve()
173          .onStatus(
174              status -> !status.is2xxSuccessful(),
175              response ->
176                  response
177                      .bodyToMono(String.class)
178                      .defaultIfEmpty("")
179                      .map(
180                          errorBody ->
181                              new RuntimeException(
182                                  String.format(
183                                      "HTTP request failed: status=%d, url=%s, response=%s",
184                                      response.statusCode().value(), url, errorBody))))
185          .bodyToFlux(org.springframework.core.io.buffer.DataBuffer.class)
186          .timeout(Duration.ofMinutes(5)) // 大容量データ処理のため5分に延長
187          .doOnNext(
188              dataBuffer -> {
189                // Ensure DataBuffer is always released, even if write fails
190                try {
191                  // ストリーミング処理:8KBずつレスポンスを処理
192                  byte[] bytes = new byte[dataBuffer.readableByteCount()];
193                  dataBuffer.read(bytes);
194                  outputStream.write(bytes);
195                  totalBytesWritten[0] += bytes.length;
196                } catch (IOException e) {
197                  throw new RuntimeException(
198                      String.format(
199                          "Failed to write response data to output stream (url=%s, bytesWritten=%d)",
200                          url, totalBytesWritten[0]),
201                      e);
202                } finally {
203                  org.springframework.core.io.buffer.DataBufferUtils.release(dataBuffer);
204                }
205              })
206          .doOnComplete(
207              () -> {
208                try {
209                  outputStream.flush();
210                  logger.info("HTTP response streaming completed successfully");
211                } catch (IOException e) {
212                  throw new RuntimeException("Failed to flush output stream", e);
213                }
214              })
215          .blockLast(); // Intentionally synchronous: AbstractStreamCommand interface requires
216      // blocking execution
217      // for compatibility with existing command pipeline. Alternative: use subscribe()
218      // with CompletableFuture for true async, but would break command interface contract.
219
220    } catch (RuntimeException e) {
221      // WebClient error responses are wrapped in RuntimeException
222      String errorMessage = "HTTP request failed: " + url + " - " + e.getMessage();
223      logger.error(errorMessage, e);
224      throw new IOException(errorMessage, e);
225    } catch (Exception e) {
226      String errorMessage = "HTTP request failed: " + url;
227      logger.error(errorMessage, e);
228      throw new IOException(errorMessage, e);
229    }
230  }
231}