1 /* 2 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http.internal.common; 27 28 import java.util.concurrent.Flow; 29 import java.util.concurrent.atomic.AtomicBoolean; 30 31 /** 32 * Maintains subscription counter and provides primitives for 33 * - accessing window 34 * - reducing window when delivering items externally 35 * - resume delivery when window was zero previously 36 * 37 * @author mimcmah 38 */ 39 public class SubscriptionBase implements Flow.Subscription { 40 41 final Demand demand = new Demand(); 42 43 final SequentialScheduler scheduler; // when window was zero and is opened, run this 44 final Runnable cancelAction; // when subscription cancelled, run this 45 final AtomicBoolean cancelled; 46 47 public SubscriptionBase(SequentialScheduler scheduler, Runnable cancelAction) { 48 this.scheduler = scheduler; 49 this.cancelAction = cancelAction; 50 this.cancelled = new AtomicBoolean(false); 51 } 52 53 @Override 54 public void request(long n) { 55 if (demand.increase(n)) 56 scheduler.runOrSchedule(); 57 } 58 59 60 61 @Override 62 public synchronized String toString() { 63 return "SubscriptionBase: window = " + demand.get() + 64 " cancelled = " + cancelled.toString(); 65 } 66 67 /** 68 * Returns true if the window was reduced by 1. In that case 69 * items must be supplied to subscribers and the scheduler run 70 * externally. If the window could not be reduced by 1, then false 71 * is returned and the scheduler will run later when the window is updated. 72 */ 73 public boolean tryDecrement() { 74 return demand.tryDecrement(); 75 } 76 77 public long window() { 78 return demand.get(); 79 } 80 81 @Override 82 public void cancel() { 83 if (cancelled.getAndSet(true)) 84 return; 85 scheduler.stop(); 86 cancelAction.run(); 87 } 88 }