/* * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package jdk.incubator.http; import java.util.Iterator; import java.util.concurrent.Flow; import jdk.incubator.http.internal.common.Demand; import jdk.incubator.http.internal.common.SequentialScheduler; /** * A Publisher that publishes items obtained from the given Iterable. Each new * subscription gets a new Iterator. */ class PullPublisher implements Flow.Publisher { // Only one of `iterable` and `throwable` can be non-null. throwable is // non-null when an error has been encountered, by the creator of // PullPublisher, while subscribing the subscriber, but before subscribe has // completed. private final Iterable iterable; private final Throwable throwable; PullPublisher(Iterable iterable, Throwable throwable) { this.iterable = iterable; this.throwable = throwable; } PullPublisher(Iterable iterable) { this(iterable, null); } @Override public void subscribe(Flow.Subscriber subscriber) { Subscription sub; if (throwable != null) { assert iterable == null : "non-null iterable: " + iterable; sub = new Subscription(subscriber, null, throwable); } else { assert throwable == null : "non-null exception: " + throwable; sub = new Subscription(subscriber, iterable.iterator(), null); } subscriber.onSubscribe(sub); if (throwable != null) { sub.pullScheduler.runOrSchedule(); } } private class Subscription implements Flow.Subscription { private final Flow.Subscriber subscriber; private final Iterator iter; private volatile boolean completed; private volatile boolean cancelled; private volatile Throwable error; final SequentialScheduler pullScheduler = new SequentialScheduler(new PullTask()); private final Demand demand = new Demand(); Subscription(Flow.Subscriber subscriber, Iterator iter, Throwable throwable) { this.subscriber = subscriber; this.iter = iter; this.error = throwable; } final class PullTask extends SequentialScheduler.CompleteRestartableTask { @Override protected void run() { if (completed || cancelled) { return; } Throwable t = error; if (t != null) { completed = true; pullScheduler.stop(); subscriber.onError(t); return; } while (demand.tryDecrement() && !cancelled) { if (!iter.hasNext()) { break; } else { subscriber.onNext(iter.next()); } } if (!iter.hasNext() && !cancelled) { completed = true; pullScheduler.stop(); subscriber.onComplete(); } } } @Override public void request(long n) { if (cancelled) return; // no-op if (n <= 0) { error = new IllegalArgumentException("illegal non-positive request:" + n); } else { demand.increase(n); } pullScheduler.runOrSchedule(); } @Override public void cancel() { cancelled = true; } } }