1 /*
   2  * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 package jdk.incubator.http;
  25 
  26 import java.nio.ByteBuffer;
  27 import java.util.LinkedList;
  28 import java.util.List;
  29 import java.util.concurrent.*;
  30 import java.util.concurrent.atomic.*;
  31 import org.testng.annotations.Test;
  32 import jdk.incubator.http.internal.common.SubscriberWrapper;
  33 
  34 @Test
  35 public class WrapperTest {
  36     static final int LO_PRI = 1;
  37     static final int HI_PRI = 2;
  38     static final int NUM_HI_PRI = 240;
  39     static final int BUFSIZE = 1016;
  40     static final int BUFSIZE_INT = BUFSIZE/4;
  41     static final int HI_PRI_FREQ = 40;
  42 
  43     static final int TOTAL = 10000;
  44     //static final int TOTAL = 500;
  45 
  46     final SubmissionPublisher<List<ByteBuffer>> publisher;
  47     final SubscriberWrapper sub1, sub2, sub3;
  48     final ExecutorService executor = Executors.newCachedThreadPool();
  49     volatile int hipricount = 0;
  50 
  51     void errorHandler(Flow.Subscriber<? super List<ByteBuffer>> sub, Throwable t) {
  52         System.err.printf("Exception from %s : %s\n", sub.toString(), t.toString());
  53     }
  54 
  55     public WrapperTest() {
  56         publisher = new SubmissionPublisher<>(executor, 600,
  57                 (a, b) -> {
  58                     errorHandler(a, b);
  59                 });
  60 
  61         CompletableFuture<Void> notif = new CompletableFuture<>();
  62         LastSubscriber ls = new LastSubscriber(notif);
  63         sub1 = new Filter1(ls);
  64         sub2 = new Filter2(sub1);
  65         sub3 = new Filter2(sub2);
  66     }
  67 
  68     public class Filter2 extends SubscriberWrapper {
  69         Filter2(SubscriberWrapper wrapper) {
  70             super(wrapper);
  71         }
  72 
  73         // reverse the order of the bytes in each buffer
  74         public void incoming(List<ByteBuffer> list, boolean complete) {
  75             List<ByteBuffer> out = new LinkedList<>();
  76             for (ByteBuffer inbuf : list) {
  77                 int size = inbuf.remaining();
  78                 ByteBuffer outbuf = ByteBuffer.allocate(size);
  79                 for (int i=size; i>0; i--) {
  80                     byte b = inbuf.get(i-1);
  81                     outbuf.put(b);
  82                 }
  83                 outbuf.flip();
  84                 out.add(outbuf);
  85             }
  86             if (complete) System.out.println("Filter2.complete");
  87             outgoing(out, complete);
  88         }
  89 
  90         protected long windowUpdate(long currval) {
  91             return currval == 0 ? 1 : 0;
  92         }
  93     }
  94 
  95     volatile int filter1Calls = 0; // every third call we insert hi pri data
  96 
  97     ByteBuffer getHiPri(int val) {
  98         ByteBuffer buf = ByteBuffer.allocate(8);
  99         buf.putInt(HI_PRI);
 100         buf.putInt(val);
 101         buf.flip();
 102         return buf;
 103     }
 104 
 105     volatile int hiPriAdded = 0;
 106 
 107     public class Filter1 extends SubscriberWrapper {
 108         Filter1(Flow.Subscriber<List<ByteBuffer>> downstreamSubscriber)
 109         {
 110             super();
 111             subscribe(downstreamSubscriber);
 112         }
 113 
 114         // Inserts up to NUM_HI_PRI hi priority buffers into flow
 115         protected void incoming(List<ByteBuffer> in, boolean complete) {
 116             if ((++filter1Calls % HI_PRI_FREQ) == 0 && (hiPriAdded++ < NUM_HI_PRI)) {
 117                 sub1.outgoing(getHiPri(hipricount++), false);
 118             }
 119             // pass data thru
 120             if (complete) System.out.println("Filter1.complete");
 121             outgoing(in, complete);
 122         }
 123 
 124         protected long windowUpdate(long currval) {
 125             return currval == 0 ? 1 : 0;
 126         }
 127     }
 128 
 129     /**
 130      * Final subscriber in the chain. Compares the data sent by the original
 131      * publisher.
 132      */
 133     static public class LastSubscriber implements Flow.Subscriber<List<ByteBuffer>> {
 134         volatile Flow.Subscription subscription;
 135         volatile int hipriCounter=0;
 136         volatile int lopriCounter=0;
 137         final CompletableFuture<Void> cf;
 138 
 139         LastSubscriber(CompletableFuture<Void> cf) {
 140             this.cf = cf;
 141         }
 142 
 143         @Override
 144         public void onSubscribe(Flow.Subscription subscription) {
 145             this.subscription = subscription;
 146             subscription.request(50); // say
 147         }
 148 
 149         private void error(String...args) {
 150             StringBuilder sb = new StringBuilder();
 151             for (String s : args) {
 152                 sb.append(s);
 153                 sb.append(' ');
 154             }
 155             String msg = sb.toString();
 156             System.out.println("Error: " + msg);
 157             RuntimeException e = new RuntimeException(msg);
 158             cf.completeExceptionally(e);
 159             subscription.cancel(); // This is where we need a variant that include exception
 160         }
 161 
 162         private void check(ByteBuffer buf) {
 163             int type = buf.getInt();
 164             if (type == HI_PRI) {
 165                 // check next int is hi pri counter
 166                 int c = buf.getInt();
 167                 if (c != hipriCounter)
 168                     error("hi pri counter", Integer.toString(c), Integer.toString(hipriCounter));
 169                 hipriCounter++;
 170             } else {
 171                 while (buf.hasRemaining()) {
 172                     if (buf.getInt() != lopriCounter)
 173                         error("lo pri counter", Integer.toString(lopriCounter));
 174                     lopriCounter++;
 175                 }
 176             }
 177         }
 178 
 179         @Override
 180         public void onNext(List<ByteBuffer> items) {
 181             for (ByteBuffer item : items)
 182                 check(item);
 183             subscription.request(1);
 184         }
 185 
 186         @Override
 187         public void onError(Throwable throwable) {
 188             error(throwable.getMessage());
 189         }
 190 
 191         @Override
 192         public void onComplete() {
 193             if (hipriCounter != NUM_HI_PRI)
 194                 error("hi pri at end wrong", Integer.toString(hipriCounter), Integer.toString(NUM_HI_PRI));
 195             else {
 196                 System.out.println("LastSubscriber.complete");
 197                 cf.complete(null); // success
 198             }
 199         }
 200     }
 201 
 202     List<ByteBuffer> getBuffer(int c) {
 203         ByteBuffer buf = ByteBuffer.allocate(BUFSIZE+4);
 204         buf.putInt(LO_PRI);
 205         for (int i=0; i<BUFSIZE_INT; i++) {
 206             buf.putInt(c++);
 207         }
 208         buf.flip();
 209         return List.of(buf);
 210     }
 211 
 212     boolean errorTest = false;
 213 
 214     @Test
 215     public void run() throws InterruptedException {
 216         try {
 217             CompletableFuture<Void> completion = sub3.completion();
 218             publisher.subscribe(sub3);
 219             // now submit a load of data
 220             int counter = 0;
 221             for (int i = 0; i < TOTAL; i++) {
 222                 List<ByteBuffer> bufs = getBuffer(counter);
 223                 //if (i==2)
 224                     //bufs.get(0).putInt(41, 1234); // error
 225                 counter += BUFSIZE_INT;
 226                 publisher.submit(bufs);
 227                 //if (i % 1000 == 0)
 228                     //Thread.sleep(1000);
 229                 //if (i == 99) {
 230                     //publisher.closeExceptionally(new RuntimeException("Test error"));
 231                     //errorTest = true;
 232                     //break;
 233                 //}
 234             }
 235             if (!errorTest) {
 236                 publisher.close();
 237             }
 238             System.out.println("Publisher completed");
 239             completion.join();
 240             System.out.println("Subscribers completed ok");
 241         } finally {
 242             executor.shutdownNow();
 243         }
 244     }
 245 
 246     static void display(CompletableFuture<?> cf) {
 247         System.out.print (cf);
 248         if (!cf.isDone())
 249             return;
 250         try {
 251             cf.join(); // wont block
 252         } catch (Exception e) {
 253             System.out.println(" " + e);
 254         }
 255     }
 256 
 257 /*
 258     public static void main(String[] args) throws InterruptedException {
 259         WrapperTest test = new WrapperTest();
 260         test.run();
 261     }
 262 */
 263 }