001package com.streamconverter; 002 003import java.io.IOException; 004import java.io.InputStream; 005import java.io.OutputStream; 006import java.io.PipedInputStream; 007import java.io.PipedOutputStream; 008import java.util.concurrent.atomic.AtomicBoolean; 009 010/** 011 * コマンド間の異常終了を伝播するためのPipedStreamペアラッパー。 012 * 013 * <p>いずれかのコマンドが異常終了した際に {@link #abort()} を呼ぶことで、 対向の read()/write() に {@link PipeAbortedException} 014 * を送出させる。 これにより、対向コマンドは「パイプが閉じた」という IO エラーではなく、 「対向コマンドが異常終了した」という型付きの例外を受け取って終了できる。 015 * 016 * <p>使用例: 017 * 018 * <pre>{@code 019 * AbortablePipedStream pipe = new AbortablePipedStream(bufferSize); 020 * // 前段コマンドには pipe.outputStream() を渡す 021 * // 後段コマンドには pipe.inputStream() を渡す 022 * 023 * // 前段コマンドが異常終了した場合: 024 * pipe.abort(); 025 * // → 後段コマンドの read() が PipeAbortedException を投げる 026 * }</pre> 027 * 028 * @see PipeAbortedException 029 */ 030public final class AbortablePipedStream implements AutoCloseable { 031 032 private final PipedOutputStream out; 033 private final PipedInputStream in; 034 private final AtomicBoolean aborted = new AtomicBoolean(false); 035 private final AtomicBoolean outputStreamIssued = new AtomicBoolean(false); 036 private final AtomicBoolean inputStreamIssued = new AtomicBoolean(false); 037 038 /** 039 * 指定バッファサイズで PipedStream ペアを生成する。 040 * 041 * @param bufferSize パイプバッファのサイズ(バイト)。1 以上であること 042 * @throws IllegalArgumentException bufferSize が 1 未満の場合 043 * @throws IOException PipedStream の生成に失敗した場合 044 */ 045 public AbortablePipedStream(int bufferSize) throws IOException { 046 if (bufferSize < 1) { 047 throw new IllegalArgumentException("bufferSize must be >= 1, got: " + bufferSize); 048 } 049 this.out = new PipedOutputStream(); 050 this.in = new PipedInputStream(out, bufferSize); 051 } 052 053 /** 054 * 書き込み側のラッパーを返す(前段コマンドに渡す)。 055 * 056 * <p>このパイプは書き込み側を1つのコマンドのみが使用することを想定しており、 2回目の呼び出しは {@link IllegalStateException} を投げる。 057 * 058 * @return 前段コマンド用の OutputStream 059 * @throws IllegalStateException 2回目以降の呼び出しの場合 060 */ 061 public OutputStream outputStream() { 062 if (!outputStreamIssued.compareAndSet(false, true)) { 063 throw new IllegalStateException("outputStream() has already been called on this pipe"); 064 } 065 return new AbortableOutputStream(); 066 } 067 068 /** 069 * 読み込み側のラッパーを返す(後段コマンドに渡す)。 070 * 071 * <p>このパイプは読み込み側を1つのコマンドのみが使用することを想定しており、 2回目の呼び出しは {@link IllegalStateException} を投げる。 072 * 073 * @return 後段コマンド用の InputStream 074 * @throws IllegalStateException 2回目以降の呼び出しの場合 075 */ 076 public InputStream inputStream() { 077 if (!inputStreamIssued.compareAndSet(false, true)) { 078 throw new IllegalStateException("inputStream() has already been called on this pipe"); 079 } 080 return new AbortableInputStream(); 081 } 082 083 /** 084 * このパイプを abort 済みとしてマークする。 085 * 086 * <p>以降の read()/write()/flush() 呼び出しが {@link PipeAbortedException} を投げるようになる。 パイプのクローズは {@link 087 * StreamConverter} の {@code closeResources()} が担うため、 このメソッドはフラグのセットのみを行う。 088 * 089 * <p>{@link #close()} の前後どちらで呼んでも安全。ただし {@link #close()} 後は read()/write() が呼ばれる機会がないため、abort 090 * フラグは実質的に効果を持たない。 091 */ 092 public void abort() { 093 aborted.set(true); 094 } 095 096 /** 097 * パイプを正常にクローズする。 098 * 099 * <p>{@link AutoCloseable} の実装。abort フラグはセットしない。 {@link #abort()} の前後どちらで呼んでも安全。 100 */ 101 @Override 102 public void close() throws IOException { 103 try { 104 out.close(); 105 } catch (IOException ignored) { 106 // 既に閉じている場合は無視する 107 } 108 try { 109 in.close(); 110 } catch (IOException ignored) { 111 // 既に閉じている場合は無視する 112 } 113 } 114 115 private void checkAborted() throws IOException { 116 if (aborted.get()) { 117 throw new PipeAbortedException(); 118 } 119 } 120 121 private class AbortableOutputStream extends OutputStream { 122 123 @Override 124 public void write(int b) throws IOException { 125 checkAborted(); 126 try { 127 out.write(b); 128 } catch (IOException e) { 129 checkAborted(); 130 throw e; 131 } 132 } 133 134 @Override 135 public void write(byte[] b, int off, int len) throws IOException { 136 checkAborted(); 137 try { 138 out.write(b, off, len); 139 } catch (IOException e) { 140 checkAborted(); 141 throw e; 142 } 143 } 144 145 @Override 146 public void flush() throws IOException { 147 checkAborted(); 148 try { 149 out.flush(); 150 } catch (IOException e) { 151 checkAborted(); 152 throw e; 153 } 154 } 155 156 @Override 157 public void close() throws IOException { 158 out.close(); 159 } 160 } 161 162 private class AbortableInputStream extends InputStream { 163 164 @Override 165 public int read() throws IOException { 166 checkAborted(); 167 try { 168 return in.read(); 169 } catch (IOException e) { 170 checkAborted(); 171 throw e; 172 } 173 } 174 175 @Override 176 public int read(byte[] b, int off, int len) throws IOException { 177 checkAborted(); 178 try { 179 return in.read(b, off, len); 180 } catch (IOException e) { 181 checkAborted(); 182 throw e; 183 } 184 } 185 186 @Override 187 public void close() throws IOException { 188 in.close(); 189 } 190 } 191}