001package com.streamconverter;
002
003import java.io.IOException;
004import java.io.InputStream;
005import java.io.OutputStream;
006import java.io.PipedInputStream;
007import java.io.PipedOutputStream;
008import java.util.concurrent.atomic.AtomicBoolean;
009
010/**
011 * コマンド間の異常終了を伝播するためのPipedStreamペアラッパー。
012 *
013 * <p>いずれかのコマンドが異常終了した際に {@link #abort()} を呼ぶことで、 対向の read()/write() に {@link PipeAbortedException}
014 * を送出させる。 これにより、対向コマンドは「パイプが閉じた」という IO エラーではなく、 「対向コマンドが異常終了した」という型付きの例外を受け取って終了できる。
015 *
016 * <p>使用例:
017 *
018 * <pre>{@code
019 * AbortablePipedStream pipe = new AbortablePipedStream(bufferSize);
020 * // 前段コマンドには pipe.outputStream() を渡す
021 * // 後段コマンドには pipe.inputStream() を渡す
022 *
023 * // 前段コマンドが異常終了した場合:
024 * pipe.abort();
025 * // → 後段コマンドの read() が PipeAbortedException を投げる
026 * }</pre>
027 *
028 * @see PipeAbortedException
029 */
030public final class AbortablePipedStream implements AutoCloseable {
031
032  private final PipedOutputStream out;
033  private final PipedInputStream in;
034  private final AtomicBoolean aborted = new AtomicBoolean(false);
035  private final AtomicBoolean outputStreamIssued = new AtomicBoolean(false);
036  private final AtomicBoolean inputStreamIssued = new AtomicBoolean(false);
037
038  /**
039   * 指定バッファサイズで PipedStream ペアを生成する。
040   *
041   * @param bufferSize パイプバッファのサイズ(バイト)。1 以上であること
042   * @throws IllegalArgumentException bufferSize が 1 未満の場合
043   * @throws IOException PipedStream の生成に失敗した場合
044   */
045  public AbortablePipedStream(int bufferSize) throws IOException {
046    if (bufferSize < 1) {
047      throw new IllegalArgumentException("bufferSize must be >= 1, got: " + bufferSize);
048    }
049    this.out = new PipedOutputStream();
050    this.in = new PipedInputStream(out, bufferSize);
051  }
052
053  /**
054   * 書き込み側のラッパーを返す(前段コマンドに渡す)。
055   *
056   * <p>このパイプは書き込み側を1つのコマンドのみが使用することを想定しており、 2回目の呼び出しは {@link IllegalStateException} を投げる。
057   *
058   * @return 前段コマンド用の OutputStream
059   * @throws IllegalStateException 2回目以降の呼び出しの場合
060   */
061  public OutputStream outputStream() {
062    if (!outputStreamIssued.compareAndSet(false, true)) {
063      throw new IllegalStateException("outputStream() has already been called on this pipe");
064    }
065    return new AbortableOutputStream();
066  }
067
068  /**
069   * 読み込み側のラッパーを返す(後段コマンドに渡す)。
070   *
071   * <p>このパイプは読み込み側を1つのコマンドのみが使用することを想定しており、 2回目の呼び出しは {@link IllegalStateException} を投げる。
072   *
073   * @return 後段コマンド用の InputStream
074   * @throws IllegalStateException 2回目以降の呼び出しの場合
075   */
076  public InputStream inputStream() {
077    if (!inputStreamIssued.compareAndSet(false, true)) {
078      throw new IllegalStateException("inputStream() has already been called on this pipe");
079    }
080    return new AbortableInputStream();
081  }
082
083  /**
084   * このパイプを abort 済みとしてマークする。
085   *
086   * <p>以降の read()/write()/flush() 呼び出しが {@link PipeAbortedException} を投げるようになる。 パイプのクローズは {@link
087   * StreamConverter} の {@code closeResources()} が担うため、 このメソッドはフラグのセットのみを行う。
088   *
089   * <p>{@link #close()} の前後どちらで呼んでも安全。ただし {@link #close()} 後は read()/write() が呼ばれる機会がないため、abort
090   * フラグは実質的に効果を持たない。
091   */
092  public void abort() {
093    aborted.set(true);
094  }
095
096  /**
097   * パイプを正常にクローズする。
098   *
099   * <p>{@link AutoCloseable} の実装。abort フラグはセットしない。 {@link #abort()} の前後どちらで呼んでも安全。
100   */
101  @Override
102  public void close() throws IOException {
103    try {
104      out.close();
105    } catch (IOException ignored) {
106      // 既に閉じている場合は無視する
107    }
108    try {
109      in.close();
110    } catch (IOException ignored) {
111      // 既に閉じている場合は無視する
112    }
113  }
114
115  private void checkAborted() throws IOException {
116    if (aborted.get()) {
117      throw new PipeAbortedException();
118    }
119  }
120
121  private class AbortableOutputStream extends OutputStream {
122
123    @Override
124    public void write(int b) throws IOException {
125      checkAborted();
126      try {
127        out.write(b);
128      } catch (IOException e) {
129        checkAborted();
130        throw e;
131      }
132    }
133
134    @Override
135    public void write(byte[] b, int off, int len) throws IOException {
136      checkAborted();
137      try {
138        out.write(b, off, len);
139      } catch (IOException e) {
140        checkAborted();
141        throw e;
142      }
143    }
144
145    @Override
146    public void flush() throws IOException {
147      checkAborted();
148      try {
149        out.flush();
150      } catch (IOException e) {
151        checkAborted();
152        throw e;
153      }
154    }
155
156    @Override
157    public void close() throws IOException {
158      out.close();
159    }
160  }
161
162  private class AbortableInputStream extends InputStream {
163
164    @Override
165    public int read() throws IOException {
166      checkAborted();
167      try {
168        return in.read();
169      } catch (IOException e) {
170        checkAborted();
171        throw e;
172      }
173    }
174
175    @Override
176    public int read(byte[] b, int off, int len) throws IOException {
177      checkAborted();
178      try {
179        return in.read(b, off, len);
180      } catch (IOException e) {
181        checkAborted();
182        throw e;
183      }
184    }
185
186    @Override
187    public void close() throws IOException {
188      in.close();
189    }
190  }
191}