1 /* 2 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 package jdk.incubator.http; 25 26 import java.nio.ByteBuffer; 27 import java.util.LinkedList; 28 import java.util.List; 29 import java.util.concurrent.*; 30 import java.util.concurrent.atomic.*; 31 import org.testng.annotations.Test; 32 import jdk.incubator.http.internal.common.SubscriberWrapper; 33 34 @Test 35 public class WrapperTest { 36 static final int LO_PRI = 1; 37 static final int HI_PRI = 2; 38 static final int NUM_HI_PRI = 240; 39 static final int BUFSIZE = 1016; 40 static final int BUFSIZE_INT = BUFSIZE/4; 41 static final int HI_PRI_FREQ = 40; 42 43 static final int TOTAL = 10000; 44 //static final int TOTAL = 500; 45 46 final SubmissionPublisher<List<ByteBuffer>> publisher; 47 final SubscriberWrapper sub1, sub2, sub3; 48 final ExecutorService executor = Executors.newCachedThreadPool(); 49 volatile int hipricount = 0; 50 51 void errorHandler(Flow.Subscriber<? super List<ByteBuffer>> sub, Throwable t) { 52 System.err.printf("Exception from %s : %s\n", sub.toString(), t.toString()); 53 } 54 55 public WrapperTest() { 56 publisher = new SubmissionPublisher<>(executor, 600, 57 (a, b) -> { 58 errorHandler(a, b); 59 }); 60 61 CompletableFuture<Void> notif = new CompletableFuture<>(); 62 LastSubscriber ls = new LastSubscriber(notif); 63 sub1 = new Filter1(ls); 64 sub2 = new Filter2(sub1); 65 sub3 = new Filter2(sub2); 66 } 67 68 public class Filter2 extends SubscriberWrapper { 69 Filter2(SubscriberWrapper wrapper) { 70 super(wrapper); 71 } 72 73 // reverse the order of the bytes in each buffer 74 public void incoming(List<ByteBuffer> list, boolean complete) { 75 List<ByteBuffer> out = new LinkedList<>(); 76 for (ByteBuffer inbuf : list) { 77 int size = inbuf.remaining(); 78 ByteBuffer outbuf = ByteBuffer.allocate(size); 79 for (int i=size; i>0; i--) { 80 byte b = inbuf.get(i-1); 81 outbuf.put(b); 82 } 83 outbuf.flip(); 84 out.add(outbuf); 85 } 86 if (complete) System.out.println("Filter2.complete"); 87 outgoing(out, complete); 88 } 89 90 protected long windowUpdate(long currval) { 91 return currval == 0 ? 1 : 0; 92 } 93 } 94 95 volatile int filter1Calls = 0; // every third call we insert hi pri data 96 97 ByteBuffer getHiPri(int val) { 98 ByteBuffer buf = ByteBuffer.allocate(8); 99 buf.putInt(HI_PRI); 100 buf.putInt(val); 101 buf.flip(); 102 return buf; 103 } 104 105 volatile int hiPriAdded = 0; 106 107 public class Filter1 extends SubscriberWrapper { 108 Filter1(Flow.Subscriber<List<ByteBuffer>> downstreamSubscriber) 109 { 110 super(); 111 subscribe(downstreamSubscriber); 112 } 113 114 // Inserts up to NUM_HI_PRI hi priority buffers into flow 115 protected void incoming(List<ByteBuffer> in, boolean complete) { 116 if ((++filter1Calls % HI_PRI_FREQ) == 0 && (hiPriAdded++ < NUM_HI_PRI)) { 117 sub1.outgoing(getHiPri(hipricount++), false); 118 } 119 // pass data thru 120 if (complete) System.out.println("Filter1.complete"); 121 outgoing(in, complete); 122 } 123 124 protected long windowUpdate(long currval) { 125 return currval == 0 ? 1 : 0; 126 } 127 } 128 129 /** 130 * Final subscriber in the chain. Compares the data sent by the original 131 * publisher. 132 */ 133 static public class LastSubscriber implements Flow.Subscriber<List<ByteBuffer>> { 134 volatile Flow.Subscription subscription; 135 volatile int hipriCounter=0; 136 volatile int lopriCounter=0; 137 final CompletableFuture<Void> cf; 138 139 LastSubscriber(CompletableFuture<Void> cf) { 140 this.cf = cf; 141 } 142 143 @Override 144 public void onSubscribe(Flow.Subscription subscription) { 145 this.subscription = subscription; 146 subscription.request(50); // say 147 } 148 149 private void error(String...args) { 150 StringBuilder sb = new StringBuilder(); 151 for (String s : args) { 152 sb.append(s); 153 sb.append(' '); 154 } 155 String msg = sb.toString(); 156 System.out.println("Error: " + msg); 157 RuntimeException e = new RuntimeException(msg); 158 cf.completeExceptionally(e); 159 subscription.cancel(); // This is where we need a variant that include exception 160 } 161 162 private void check(ByteBuffer buf) { 163 int type = buf.getInt(); 164 if (type == HI_PRI) { 165 // check next int is hi pri counter 166 int c = buf.getInt(); 167 if (c != hipriCounter) 168 error("hi pri counter", Integer.toString(c), Integer.toString(hipriCounter)); 169 hipriCounter++; 170 } else { 171 while (buf.hasRemaining()) { 172 if (buf.getInt() != lopriCounter) 173 error("lo pri counter", Integer.toString(lopriCounter)); 174 lopriCounter++; 175 } 176 } 177 } 178 179 @Override 180 public void onNext(List<ByteBuffer> items) { 181 for (ByteBuffer item : items) 182 check(item); 183 subscription.request(1); 184 } 185 186 @Override 187 public void onError(Throwable throwable) { 188 error(throwable.getMessage()); 189 } 190 191 @Override 192 public void onComplete() { 193 if (hipriCounter != NUM_HI_PRI) 194 error("hi pri at end wrong", Integer.toString(hipriCounter), Integer.toString(NUM_HI_PRI)); 195 else { 196 System.out.println("LastSubscriber.complete"); 197 cf.complete(null); // success 198 } 199 } 200 } 201 202 List<ByteBuffer> getBuffer(int c) { 203 ByteBuffer buf = ByteBuffer.allocate(BUFSIZE+4); 204 buf.putInt(LO_PRI); 205 for (int i=0; i<BUFSIZE_INT; i++) { 206 buf.putInt(c++); 207 } 208 buf.flip(); 209 return List.of(buf); 210 } 211 212 boolean errorTest = false; 213 214 @Test 215 public void run() throws InterruptedException { 216 try { 217 CompletableFuture<Void> completion = sub3.completion(); 218 publisher.subscribe(sub3); 219 // now submit a load of data 220 int counter = 0; 221 for (int i = 0; i < TOTAL; i++) { 222 List<ByteBuffer> bufs = getBuffer(counter); 223 //if (i==2) 224 //bufs.get(0).putInt(41, 1234); // error 225 counter += BUFSIZE_INT; 226 publisher.submit(bufs); 227 //if (i % 1000 == 0) 228 //Thread.sleep(1000); 229 //if (i == 99) { 230 //publisher.closeExceptionally(new RuntimeException("Test error")); 231 //errorTest = true; 232 //break; 233 //} 234 } 235 if (!errorTest) { 236 publisher.close(); 237 } 238 System.out.println("Publisher completed"); 239 completion.join(); 240 System.out.println("Subscribers completed ok"); 241 } finally { 242 executor.shutdownNow(); 243 } 244 } 245 246 static void display(CompletableFuture<?> cf) { 247 System.out.print (cf); 248 if (!cf.isDone()) 249 return; 250 try { 251 cf.join(); // wont block 252 } catch (Exception e) { 253 System.out.println(" " + e); 254 } 255 } 256 257 /* 258 public static void main(String[] args) throws InterruptedException { 259 WrapperTest test = new WrapperTest(); 260 test.run(); 261 } 262 */ 263 }