/* * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package jdk.incubator.http; import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import org.testng.annotations.Test; import jdk.incubator.http.internal.common.SubscriberWrapper; @Test public class WrapperTest { static final int LO_PRI = 1; static final int HI_PRI = 2; static final int NUM_HI_PRI = 240; static final int BUFSIZE = 1016; static final int BUFSIZE_INT = BUFSIZE/4; static final int HI_PRI_FREQ = 40; static final int TOTAL = 10000; //static final int TOTAL = 500; final SubmissionPublisher> publisher; final SubscriberWrapper sub1, sub2, sub3; final ExecutorService executor = Executors.newCachedThreadPool(); volatile int hipricount = 0; void errorHandler(Flow.Subscriber> sub, Throwable t) { System.err.printf("Exception from %s : %s\n", sub.toString(), t.toString()); } public WrapperTest() { publisher = new SubmissionPublisher<>(executor, 600, (a, b) -> { errorHandler(a, b); }); CompletableFuture notif = new CompletableFuture<>(); LastSubscriber ls = new LastSubscriber(notif); sub1 = new Filter1(ls); sub2 = new Filter2(sub1); sub3 = new Filter2(sub2); } public class Filter2 extends SubscriberWrapper { Filter2(SubscriberWrapper wrapper) { super(wrapper); } // reverse the order of the bytes in each buffer public void incoming(List list, boolean complete) { List out = new LinkedList<>(); for (ByteBuffer inbuf : list) { int size = inbuf.remaining(); ByteBuffer outbuf = ByteBuffer.allocate(size); for (int i=size; i>0; i--) { byte b = inbuf.get(i-1); outbuf.put(b); } outbuf.flip(); out.add(outbuf); } if (complete) System.out.println("Filter2.complete"); outgoing(out, complete); } protected long windowUpdate(long currval) { return currval == 0 ? 1 : 0; } } volatile int filter1Calls = 0; // every third call we insert hi pri data ByteBuffer getHiPri(int val) { ByteBuffer buf = ByteBuffer.allocate(8); buf.putInt(HI_PRI); buf.putInt(val); buf.flip(); return buf; } volatile int hiPriAdded = 0; public class Filter1 extends SubscriberWrapper { Filter1(Flow.Subscriber> downstreamSubscriber) { super(); subscribe(downstreamSubscriber); } // Inserts up to NUM_HI_PRI hi priority buffers into flow protected void incoming(List in, boolean complete) { if ((++filter1Calls % HI_PRI_FREQ) == 0 && (hiPriAdded++ < NUM_HI_PRI)) { sub1.outgoing(getHiPri(hipricount++), false); } // pass data thru if (complete) System.out.println("Filter1.complete"); outgoing(in, complete); } protected long windowUpdate(long currval) { return currval == 0 ? 1 : 0; } } /** * Final subscriber in the chain. Compares the data sent by the original * publisher. */ static public class LastSubscriber implements Flow.Subscriber> { volatile Flow.Subscription subscription; volatile int hipriCounter=0; volatile int lopriCounter=0; final CompletableFuture cf; LastSubscriber(CompletableFuture cf) { this.cf = cf; } @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; subscription.request(50); // say } private void error(String...args) { StringBuilder sb = new StringBuilder(); for (String s : args) { sb.append(s); sb.append(' '); } String msg = sb.toString(); System.out.println("Error: " + msg); RuntimeException e = new RuntimeException(msg); cf.completeExceptionally(e); subscription.cancel(); // This is where we need a variant that include exception } private void check(ByteBuffer buf) { int type = buf.getInt(); if (type == HI_PRI) { // check next int is hi pri counter int c = buf.getInt(); if (c != hipriCounter) error("hi pri counter", Integer.toString(c), Integer.toString(hipriCounter)); hipriCounter++; } else { while (buf.hasRemaining()) { if (buf.getInt() != lopriCounter) error("lo pri counter", Integer.toString(lopriCounter)); lopriCounter++; } } } @Override public void onNext(List items) { for (ByteBuffer item : items) check(item); subscription.request(1); } @Override public void onError(Throwable throwable) { error(throwable.getMessage()); } @Override public void onComplete() { if (hipriCounter != NUM_HI_PRI) error("hi pri at end wrong", Integer.toString(hipriCounter), Integer.toString(NUM_HI_PRI)); else { System.out.println("LastSubscriber.complete"); cf.complete(null); // success } } } List getBuffer(int c) { ByteBuffer buf = ByteBuffer.allocate(BUFSIZE+4); buf.putInt(LO_PRI); for (int i=0; i completion = sub3.completion(); publisher.subscribe(sub3); // now submit a load of data int counter = 0; for (int i = 0; i < TOTAL; i++) { List bufs = getBuffer(counter); //if (i==2) //bufs.get(0).putInt(41, 1234); // error counter += BUFSIZE_INT; publisher.submit(bufs); //if (i % 1000 == 0) //Thread.sleep(1000); //if (i == 99) { //publisher.closeExceptionally(new RuntimeException("Test error")); //errorTest = true; //break; //} } if (!errorTest) { publisher.close(); } System.out.println("Publisher completed"); completion.join(); System.out.println("Subscribers completed ok"); } finally { executor.shutdownNow(); } } static void display(CompletableFuture cf) { System.out.print (cf); if (!cf.isDone()) return; try { cf.join(); // wont block } catch (Exception e) { System.out.println(" " + e); } } /* public static void main(String[] args) throws InterruptedException { WrapperTest test = new WrapperTest(); test.run(); } */ }