001package com.streamconverter.command.impl.json; 002 003import com.fasterxml.jackson.databind.JsonNode; 004import com.fasterxml.jackson.databind.ObjectMapper; 005import com.networknt.schema.JsonSchema; 006import com.networknt.schema.JsonSchemaFactory; 007import com.networknt.schema.SpecVersion; 008import com.networknt.schema.ValidationMessage; 009import com.streamconverter.StreamProcessingException; 010import com.streamconverter.command.ConsumerCommand; 011import java.io.File; 012import java.io.IOException; 013import java.io.InputStream; 014import java.util.Objects; 015import java.util.Set; 016import java.util.concurrent.atomic.AtomicBoolean; 017import java.util.concurrent.atomic.AtomicInteger; 018import org.jsfr.json.JsonSurfer; 019import org.jsfr.json.JsonSurferJackson; 020import org.slf4j.Logger; 021import org.slf4j.LoggerFactory; 022 023/** 024 * JsonSurferを使った完全ストリーミングJSON検証コマンド 025 * 026 * <p>このクラスは真のストリーミング処理でJSONを検証します。 従来のJSON Schema検証と組み合わせて、大容量データに対応しつつ詳細な検証を提供します。 027 * 028 * <p><strong>アプローチ:</strong><br> 029 * 1. JsonSurferによる高速ストリーミング事前検証(構造・必須フィールド確認)<br> 030 * 2. 事前検証通過時のみJSON Schema検証実行<br> 031 * 3. 任意サイズのデータを一定メモリで処理 032 * 033 * <p><strong>ライブラリ選択理由:</strong><br> 034 * JsonSurferを採用した理由: 035 * 036 * <ul> 037 * <li>✅ 完全ストリーミング処理(DOM構築なし) 038 * <li>✅ JsonPathサポートによる柔軟な検証ルール記述 039 * <li>✅ イベントドリブンでメモリ効率最大化 040 * <li>✅ Jackson統合で既存依存関係と整合 041 * </ul> 042 * 043 * <p><strong>他ライブラリを採用しなかった理由:</strong><br> 044 * 045 * <ul> 046 * <li><strong>StAXON:</strong> XML思考の強制、namespace問題、JSON型情報の欠如 047 * <li><strong>JSR 353:</strong> 低レベルAPI、JsonPathなし、ボイラープレート大量 048 * <li><strong>Gson JsonReader:</strong> プル解析のみ、状態管理必須、実装複雑化 049 * </ul> 050 * 051 * <p>使用例: 052 * 053 * <pre> 054 * JsonStreamingValidateCommand validator = new JsonStreamingValidateCommand("schema/user.json"); 055 * validator.consume(jsonInputStream); 056 * </pre> 057 */ 058public class JsonStreamingValidateCommand extends ConsumerCommand { 059 private static final Logger logger = LoggerFactory.getLogger(JsonStreamingValidateCommand.class); 060 061 private final String schemaPath; 062 private final ObjectMapper objectMapper; 063 private final JsonSchemaFactory schemaFactory; 064 private final JsonSurfer surfer; 065 private volatile JsonSchema cachedSchema; 066 067 /** 068 * コンストラクタ 069 * 070 * @param schemaPath JSONスキーマファイルのパス 071 * @throws IllegalArgumentException スキーマパスがnullまたは空の場合 072 * @throws StreamProcessingException スキーマファイルの読み込みに失敗した場合 073 */ 074 public JsonStreamingValidateCommand(String schemaPath) { 075 this.schemaPath = validateSchemaPath(schemaPath); 076 this.objectMapper = new ObjectMapper(); 077 this.schemaFactory = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V7); 078 this.surfer = JsonSurferJackson.INSTANCE; 079 080 // パフォーマンス改善: 遅延読み込みによりコンストラクタでのI/O操作を回避 081 // スキーマの妥当性検証は最初の使用時に実行 082 } 083 084 /** スキーマパスの検証 */ 085 private String validateSchemaPath(String path) { 086 if (path == null) { 087 throw new IllegalArgumentException("Schema path cannot be null"); 088 } 089 String trimmedPath = path.trim(); 090 if (trimmedPath.isEmpty()) { 091 throw new IllegalArgumentException("Schema path cannot be empty"); 092 } 093 return trimmedPath; 094 } 095 096 /** 097 * JSONストリーミング検証を実行します 098 * 099 * @param inputStream 検証対象のJSONデータを含む入力ストリーム 100 * @throws IOException I/Oエラーが発生した場合 101 * @throws StreamProcessingException JSONバリデーションエラーが発生した場合 102 */ 103 @Override 104 public void consume(InputStream inputStream) throws IOException { 105 Objects.requireNonNull(inputStream, "InputStream cannot be null"); 106 107 logger.info("Starting streaming JSON validation with schema: {}", schemaPath); 108 109 try { 110 // 入力ストリームをバッファリングして再利用可能にする 111 byte[] inputBuffer; 112 try { 113 inputBuffer = inputStream.readAllBytes(); 114 } catch (IOException e) { 115 throw new StreamProcessingException("Failed to buffer input stream for validation", e); 116 } 117 118 // Phase 1: JsonSurferによる高速ストリーミング事前検証 119 try (java.io.ByteArrayInputStream streamingInputStream = 120 new java.io.ByteArrayInputStream(inputBuffer)) { 121 StreamingValidationResult streamingResult = 122 performStreamingValidation(streamingInputStream); 123 124 if (!streamingResult.isValid()) { 125 throw new StreamProcessingException( 126 String.format( 127 "JSON streaming validation failed: %s", streamingResult.getErrorMessage())); 128 } 129 130 logger.debug( 131 "Streaming validation passed ({} elements), proceeding to schema validation", 132 streamingResult.getElementCount()); 133 } 134 135 // Phase 2: 事前検証通過時のみJSON Schema検証を実行 136 try (java.io.ByteArrayInputStream schemaInputStream = 137 new java.io.ByteArrayInputStream(inputBuffer)) { 138 performSchemaValidation(schemaInputStream); 139 logger.info("JSON validation completed successfully (streaming + schema validation)"); 140 } 141 142 } catch (StreamProcessingException e) { 143 throw e; 144 } catch (Exception e) { 145 logger.error("JSON streaming validation failed: {}", e.getMessage(), e); 146 throw new StreamProcessingException( 147 String.format( 148 "JSON streaming validation failed - schema: %s, error: %s", 149 schemaPath, e.getMessage()), 150 e); 151 } 152 } 153 154 /** JsonSurferを使った完全ストリーミング検証 */ 155 private StreamingValidationResult performStreamingValidation(InputStream inputStream) { 156 AtomicBoolean isValid = new AtomicBoolean(true); 157 AtomicInteger elementCount = new AtomicInteger(0); 158 StringBuilder errorMessages = new StringBuilder(); 159 160 try { 161 // 基本構造検証: ルートオブジェクトまたは配列の存在確認 162 AtomicBoolean hasRootStructure = new AtomicBoolean(false); 163 164 surfer 165 .configBuilder() 166 // ルートレベルの検証 167 .bind( 168 "$", 169 (value, context) -> { 170 hasRootStructure.set(true); 171 logger.debug("Found root JSON structure"); 172 }) 173 // 配列要素の計測 174 .bind( 175 "$[*]", 176 (value, context) -> { 177 int count = elementCount.incrementAndGet(); 178 if (count % 1000 == 0) { 179 logger.debug("Processed {} array elements", count); 180 } 181 }) 182 // オブジェクトプロパティの基本検証例 183 .bind( 184 "$..id", 185 (value, context) -> { 186 if (value == null) { 187 isValid.set(false); 188 errorMessages.append("Found null id field; "); 189 logger.warn("Validation error: null id field"); 190 } 191 }) 192 .bind( 193 "$..name", 194 (value, context) -> { 195 if (value == null || value.toString().trim().isEmpty()) { 196 isValid.set(false); 197 errorMessages.append("Found empty name field; "); 198 logger.warn("Validation error: empty name field"); 199 } 200 }) 201 // JsonSurferのエラーハンドリングは別途try-catchで実装 202 .buildAndSurf(inputStream); 203 204 if (!hasRootStructure.get()) { 205 isValid.set(false); 206 errorMessages.append("No valid JSON root structure found; "); 207 } 208 209 logger.debug("Streaming validation completed - elements processed: {}", elementCount.get()); 210 211 } catch (Exception e) { 212 isValid.set(false); 213 errorMessages.append("Streaming validation exception: ").append(e.getMessage()); 214 logger.error("Exception during streaming validation", e); 215 } 216 217 return new StreamingValidationResult( 218 isValid.get(), errorMessages.toString(), elementCount.get()); 219 } 220 221 /** JSONスキーマを遅延読み込み(スレッドセーフ) */ 222 private JsonSchema loadSchema() throws StreamProcessingException { 223 if (cachedSchema != null) { 224 return cachedSchema; 225 } 226 227 synchronized (this) { 228 if (cachedSchema != null) { 229 return cachedSchema; 230 } 231 try { 232 File schemaFile = new File(schemaPath); 233 if (!schemaFile.exists()) { 234 throw new StreamProcessingException( 235 "Failed to load JSON schema: Schema file not found: " + schemaPath); 236 } 237 238 if (!schemaFile.canRead()) { 239 throw new StreamProcessingException( 240 "Failed to load JSON schema: Schema file is not readable: " + schemaPath); 241 } 242 243 JsonNode schemaNode; 244 try { 245 schemaNode = objectMapper.readTree(schemaFile); 246 } catch (Exception e) { 247 throw new StreamProcessingException( 248 "Failed to load JSON schema: Invalid schema file format: " + schemaPath, e); 249 } 250 251 if (schemaNode == null) { 252 throw new StreamProcessingException( 253 "Failed to load JSON schema: Schema file is empty: " + schemaPath); 254 } 255 256 cachedSchema = schemaFactory.getSchema(schemaNode); 257 return cachedSchema; 258 259 } catch (StreamProcessingException e) { 260 throw e; 261 } catch (Exception e) { 262 throw new StreamProcessingException("Failed to load JSON schema from: " + schemaPath, e); 263 } 264 } 265 } 266 267 /** JSON Schema検証を実行 */ 268 private void performSchemaValidation(InputStream inputStream) throws StreamProcessingException { 269 try { 270 JsonSchema schema = loadSchema(); 271 JsonNode jsonNode = objectMapper.readTree(inputStream); 272 273 if (jsonNode == null) { 274 throw new StreamProcessingException("Failed to parse JSON for schema validation"); 275 } 276 277 Set<ValidationMessage> validationMessages = schema.validate(jsonNode); 278 279 if (!validationMessages.isEmpty()) { 280 StringBuilder errorBuilder = new StringBuilder(); 281 errorBuilder 282 .append("JSON schema validation failed with ") 283 .append(validationMessages.size()) 284 .append(" validation errors:"); 285 286 int errorCount = 0; 287 for (ValidationMessage message : validationMessages) { 288 errorBuilder.append("\n ").append(++errorCount).append(". "); 289 errorBuilder.append("Path: ").append(message.getInstanceLocation()); 290 errorBuilder.append(" - ").append(message.getMessage()); 291 } 292 293 throw new StreamProcessingException(errorBuilder.toString()); 294 } 295 296 logger.debug("JSON schema validation completed successfully"); 297 298 } catch (StreamProcessingException e) { 299 throw e; 300 } catch (Exception e) { 301 throw new StreamProcessingException("JSON schema validation failed: " + e.getMessage(), e); 302 } 303 } 304 305 /** 306 * スキーマパスを取得 307 * 308 * @return スキーマファイルのパス 309 */ 310 public String getSchemaPath() { 311 return schemaPath; 312 } 313 314 /** ストリーミング検証結果を保持するクラス */ 315 private static class StreamingValidationResult { 316 private final boolean valid; 317 private final String errorMessage; 318 private final int elementCount; 319 320 public StreamingValidationResult(boolean valid, String errorMessage, int elementCount) { 321 this.valid = valid; 322 this.errorMessage = errorMessage; 323 this.elementCount = elementCount; 324 } 325 326 public boolean isValid() { 327 return valid; 328 } 329 330 public String getErrorMessage() { 331 return errorMessage; 332 } 333 334 public int getElementCount() { 335 return elementCount; 336 } 337 } 338}