1 /* 2 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.lang.reflect.Constructor; 30 import java.security.AccessControlContext; 31 import java.security.AccessController; 32 import java.security.PrivilegedAction; 33 import java.security.ProtectionDomain; 34 import java.util.Objects; 35 import java.util.concurrent.Executor; 36 import java.util.concurrent.Executors; 37 import java.util.concurrent.ForkJoinPool; 38 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; 39 import java.util.concurrent.Future; 40 import java.util.concurrent.RejectedExecutionException; 41 import java.util.concurrent.ScheduledExecutorService; 42 import java.util.concurrent.ScheduledThreadPoolExecutor; 43 import java.util.concurrent.locks.Condition; 44 import java.util.concurrent.locks.ReentrantLock; 45 46 import static java.util.concurrent.TimeUnit.NANOSECONDS; 47 48 /** 49 * A lightweight thread. A Fiber is a <i>user mode</i> thread, it is always 50 * scheduled by the Java virtual machine rather than the operating system. 51 * 52 * <p> While a {@code Fiber} is a {@code Thread} object, Fibers do not support 53 * all features of regular threads. In particular, a Fiber is not an <i>active 54 * thread</i> in its {@link ThreadGroup thread group} and so it not enumerated 55 * or acted on by thread group operations. A Fiber does not inherit the initial 56 * values of {@link InheritableThreadLocal inheritable thread-local variables}. 57 * Finally, Fibers do not support setting an {@link 58 * Thread#setUncaughtExceptionHandler(UncaughtExceptionHandler) uncaught exception 59 * handler}, and operations such as {@link Thread#stop() Thread.stop}, {@link 60 * Thread#suspend() Thread.suspend}, and {@link Thread#resume() Thread.resume}. 61 */ 62 63 public final class Fiber extends Thread { 64 private static final ContinuationScope FIBER_SCOPE = new ContinuationScope() { }; 65 private static final ThreadGroup DEFAULT_GROUP = defaultThreadGroup(); 66 private static final Executor DEFAULT_SCHEDULER = defaultScheduler(); 67 private static final ScheduledExecutorService UNPARKER = delayedTaskScheduler(); 68 private static final AccessControlContext INNOCUOUS_ACC = innocuousACC(); 69 70 private static final VarHandle STATE; 71 private static final VarHandle PARK_PERMIT; 72 static { 73 try { 74 MethodHandles.Lookup l = MethodHandles.lookup(); 75 STATE = l.findVarHandle(Fiber.class, "state", short.class); 76 PARK_PERMIT = l.findVarHandle(Fiber.class, "parkPermit", boolean.class); 77 } catch (Exception e) { 78 throw new InternalError(e); 79 } 80 } 81 82 // scheduler and continuation 83 private final Executor scheduler; 84 private final Continuation cont; 85 86 // fiber state 87 private static final short ST_NEW = 0; 88 private static final short ST_STARTED = 1; 89 private static final short ST_RUNNING = 2; 90 private static final short ST_PARKING1 = 3; 91 private static final short ST_PARKING2 = 4; 92 private static final short ST_PARKED = 5; 93 private static final short ST_PINNED = 6; 94 private static final short ST_TERMINATED = 99; 95 private volatile short state; 96 97 // park/unpark and join/await support 98 private volatile boolean parkPermit; 99 private final ReentrantLock lock = new ReentrantLock(); 100 private Condition parking; // created lazily 101 private Condition termination; // created lazily 102 103 // Thread.interrupt support 104 // 105 // Lock operations ignore interrupts and so may call Thread.interrupted() 106 // or Thread.currentThread().interrupt() to clear or reassert the interrupt 107 // status. Changes to the following fields that require synchronization must 108 // therefore synchronize on the lock object and not use explicit lock and 109 // unlock operations. 110 private volatile Thread kernelThread; // kernel thread when running or pinned 111 private volatile boolean interrupted; // interrupt status 112 113 /** 114 * Creates a new {@code Fiber} to run the given task with the default 115 * scheduler. The {@link #start() start} method must be invoked to start its 116 * execution. 117 * 118 * @param task the task to execute 119 * @throws NullPointerException if task is {@code null} 120 */ 121 public Fiber(Runnable task) { 122 this(DEFAULT_SCHEDULER, task); 123 } 124 125 /** 126 * Creates a new {@code Fiber} to run the given task with the given scheduler. 127 * The {@link #start() start} method must be invoked to start its execution. 128 * 129 * @param scheduler the scheduler 130 * @param task the task to execute 131 * @throws SecurityManager if a security manager is set and it denies 132 * {@link RuntimePermission}{@code ("fiberScheduler")} 133 * @throws NullPointerException if the scheduler or task is {@code null} 134 */ 135 public Fiber(Executor scheduler, Runnable task) { 136 super(DEFAULT_GROUP, "Fiber", threadACC(scheduler), /*inheritThreadLocals*/false); 137 Objects.requireNonNull(scheduler); 138 Objects.requireNonNull(task); 139 140 SecurityManager sm; 141 if (scheduler != DEFAULT_SCHEDULER && (sm = System.getSecurityManager()) != null) { 142 sm.checkPermission(new RuntimePermission("fiberScheduler")); 143 } 144 145 this.scheduler = scheduler; 146 this.cont = new Continuation(FIBER_SCOPE, task) { 147 @Override 148 protected void onPinned(int reason) { yieldFailed(); } 149 }; 150 } 151 152 /** 153 * Creates a new {@code Fiber} to run the given task with the default 154 * scheduler and starts its execution. 155 * 156 * @param task the task to execute 157 * @return the fiber 158 * @throws NullPointerException if task is {@code null} 159 */ 160 public static Fiber execute(Runnable task) { 161 Fiber f = new Fiber(task); 162 f.start(); 163 return f; 164 } 165 166 /** 167 * Creates a new {@code Fiber} to run the given task with the given 168 * scheduler and starts its execution. 169 * 170 * @param scheduler the scheduler 171 * @param task the task to execute 172 * @return the fiber 173 * @throws RejectedExecutionException if the scheduler cannot accept a task 174 * @throws SecurityManager if a security manager is set and it denies 175 * {@link RuntimePermission}{@code ("fiberScheduler")} 176 * @throws NullPointerException if the scheduler or task is {@code null} 177 */ 178 public static Fiber execute(Executor scheduler, Runnable task) { 179 Fiber f = new Fiber(scheduler, task); 180 f.start(); 181 return f; 182 } 183 184 /** 185 * Causes this fiber to be scheduled for execution. 186 * 187 * @throws IllegalStateException if already started 188 * @throws RejectedExecutionException if using a scheduler and it cannot 189 * accept a task 190 */ 191 @Override 192 public void start() { 193 if (!stateCompareAndSet(ST_NEW, ST_STARTED)) 194 throw new IllegalStateException(); 195 scheduler.execute(this::runContinuation); 196 } 197 198 /** 199 * Runs or continues execution of the continuation on the current kernel thread. 200 */ 201 private void runContinuation() { 202 assert Thread.currentKernelThread().getFiber() == null; 203 204 // set state to RUNNING if not already started 205 if (!stateCompareAndSet(ST_STARTED, ST_RUNNING)) { 206 // already started 207 208 // If this fiber is parking on another kernel thread then wait for 209 // the continuation to yield before it continues on the current kernel 210 // thread. If the parking fails, due to the continuation being pinned, 211 // then execution will continue on the original kernel thread. 212 boolean pinned = waitIfParking(); 213 if (pinned) 214 return; 215 216 // continue on this kernel thread if fiber was parked 217 if (stateCompareAndSet(ST_PARKED, ST_RUNNING)) { 218 parkPermitGetAndSet(false); // consume parking permit 219 } else { 220 return; 221 } 222 } 223 224 attachKernelThread(); 225 try { 226 cont.run(); 227 } finally { 228 detachKernelThread(); 229 if (cont.isDone()) { 230 afterTerminate(); 231 } else { 232 afterYield(); 233 } 234 } 235 } 236 237 /** 238 * Invoke before continuation is started or continued to attach this fiber 239 * to the current kernel thread. 240 */ 241 private void attachKernelThread() { 242 Thread t = Thread.currentKernelThread(); 243 244 // set kernel thread before forwarding interrupt status 245 kernelThread = t; 246 if (interrupted) 247 t.interrupt(); 248 249 // set the fiber so that Thread.currentThread() returns the Fiber object 250 t.setFiber(this); 251 } 252 253 /** 254 * Invoke after a continuation yields or terminates to detach this fiber 255 * from the current kernel thread 256 */ 257 private void detachKernelThread() { 258 Thread t = Thread.currentKernelThread(); 259 t.setFiber(null); 260 261 // synchronize with Thread.interrupt to ensure that the kernel thread 262 // is not interrupted after it has been detached. 263 synchronized (lock) { 264 kernelThread = null; 265 } 266 t.getAndClearInterrupt(); 267 } 268 269 /** 270 * Invoke after yielding to set the state to ST_PARKED and notify any 271 * threads waiting for the fiber to park. 272 */ 273 private void afterYield() { 274 int oldState = stateGetAndSet(ST_PARKED); 275 assert oldState == ST_PARKING2; 276 277 // notify in case another thread is attempt to continue 278 lock.lock(); 279 try { 280 Condition parking = this.parking; 281 if (parking != null) { 282 parking.signalAll(); 283 } 284 } finally { 285 lock.unlock(); 286 } 287 } 288 289 /** 290 * Invoke after the continuation completes to set the state to ST_TERMINATED 291 * and notify anyone waiting via the join method. 292 */ 293 private void afterTerminate() { 294 int oldState = stateGetAndSet(ST_TERMINATED); 295 assert oldState == ST_RUNNING; 296 297 // notify everyone waiting for this fiber to terminate 298 lock.lock(); 299 try { 300 Condition termination = this.termination; 301 if (termination != null) { 302 termination.signalAll(); 303 } 304 } finally { 305 lock.unlock(); 306 } 307 } 308 309 /** 310 * If this fiber is parking then wait for the continuation to yield before 311 * it continues on this kernel thread. If the yield fails then the kernel 312 * thread executing it will park and needs to be signalled so that execution 313 * continues on the original kernel thread. 314 * 315 * @return true if pinned 316 */ 317 private boolean waitIfParking() { 318 short s; 319 while ((s = stateGet()) == ST_PARKING1) { 320 Thread.yield(); 321 } 322 if (s == ST_PARKING2 || s == ST_PINNED) { 323 lock.lock(); 324 try { 325 Condition parking = parkingCondition(); 326 while ((s = stateGet()) == ST_PARKING2) { 327 parking.awaitUninterruptibly(); 328 } 329 if (s == ST_PINNED) { 330 // signal so that execution continues on original thread 331 parking.signalAll(); 332 return true; 333 } 334 } finally { 335 lock.unlock(); 336 } 337 } 338 return false; 339 } 340 341 /** 342 * Invoked by onPinned when the continuation cannot yield due to a 343 * synchronized or native frame on the continuation stack. This method sets 344 * the fiber state to ST_PINNED and parks the kernel thread. 345 */ 346 private void yieldFailed() { 347 // switch to kernel thread 348 Thread t = Thread.currentKernelThread(); 349 t.setFiber(null); 350 351 if (!stateCompareAndSet(ST_PARKING2, ST_PINNED)) 352 throw new InternalError(); 353 354 boolean parkInterrupted = false; 355 lock.lock(); 356 try { 357 parkingCondition().await(); 358 } catch (InterruptedException e) { 359 parkInterrupted = true; 360 } finally { 361 lock.unlock(); 362 363 // continue running on the kernel thread 364 if (!stateCompareAndSet(ST_PINNED, ST_RUNNING)) 365 throw new InternalError(); 366 367 // consume parking permit 368 parkPermitGetAndSet(false); 369 370 // switch back to fiber 371 t.setFiber(this); 372 } 373 374 // restore interrupt status 375 if (parkInterrupted) { 376 // no need to synchronize here as the interrupt is never cleared 377 // asynchronously 378 interrupted = true; 379 t.interrupt(); 380 } 381 } 382 383 /** 384 * Disables the current fiber for scheduling purposes. 385 * 386 * <p> If this fiber has already been {@link #unpark() unparked} then the 387 * parking permit is consumed and this method completes immediately; 388 * otherwise the current fiber is disabled for scheduling purposes and lies 389 * dormant until it is {@link #unpark() unparked} or {@link #interrupt() 390 * interrupted}. 391 * 392 * @throws IllegalCallerException if not called from a fiber 393 */ 394 public static void park() { 395 Fiber fiber = Thread.currentKernelThread().getFiber(); 396 if (fiber == null) 397 throw new IllegalCallerException(); 398 fiber.maybePark(); 399 } 400 401 /** 402 * Disables the current fiber for scheduling purposes for up to the 403 * specified waiting time. 404 * 405 * <p> If this fiber has already been {@link #unpark() unparked} then the 406 * parking permit is consumed and this method completes immediately; 407 * otherwise if the time to wait is greater than zero then the current fiber 408 * is disabled for scheduling purposes and lies dormant until it is {@link 409 * #unpark unparked}, {@link #interrupt() interrupted}, or the waiting time 410 * elapses. 411 * 412 * @param nanos the maximum number of nanoseconds to wait. 413 * 414 * @throws IllegalCallerException if not called from a fiber 415 */ 416 public static void parkNanos(long nanos) { 417 Thread t = Thread.currentKernelThread(); 418 Fiber fiber = t.getFiber(); 419 if (fiber == null) 420 throw new IllegalCallerException("not a fiber"); 421 if (nanos > 0) { 422 // switch to kernel thread when submitting task to unpark 423 t.setFiber(null); 424 Future<?> unparker; 425 try { 426 unparker = UNPARKER.schedule(fiber::unpark, nanos, NANOSECONDS); 427 } finally { 428 t.setFiber(fiber); 429 } 430 // now park 431 try { 432 fiber.maybePark(); 433 } finally { 434 unparker.cancel(false); 435 } 436 } else { 437 // consume permit when not parking 438 fiber.parkPermitGetAndSet(false); 439 } 440 } 441 442 /** 443 * Re-enables this fiber for scheduling. If the fiber was {@link #park() 444 * parked} then it will be unblocked, otherwise its next call to {@code park} 445 * or {@link #parkNanos(long) parkNanos} is guaranteed not to block. 446 * 447 * @throws IllegalStateException if the fiber has not started 448 * @throws RejectedExecutionException if using a scheduler and it cannot 449 * accept a task 450 * @return this fiber 451 */ 452 public Fiber unpark() { 453 if (stateGet() == ST_NEW) 454 throw new IllegalStateException("fiber not started"); 455 Thread t = Thread.currentKernelThread(); 456 Fiber fiber = t.getFiber(); 457 if (!parkPermitGetAndSet(true) && fiber != this) { 458 // switch to kernel thread when submitting task to continue 459 if (fiber != null) { 460 t.setFiber(null); 461 } 462 try { 463 scheduler.execute(this::runContinuation); 464 } finally { 465 if (fiber != null) { 466 t.setFiber(fiber); 467 } 468 } 469 } 470 return this; 471 } 472 473 /** 474 * Park or complete immediately. 475 * 476 * <p> If this fiber has already been unparked or its interrupt status is 477 * set then this method completes immediately; otherwise it yields. 478 */ 479 private void maybePark() { 480 assert Thread.currentKernelThread().getFiber() == this; 481 482 // prepare to park; important to do this before consuming the parking 483 // permit and yielding 484 if (!stateCompareAndSet(ST_RUNNING, ST_PARKING1)) 485 throw new InternalError(); 486 487 // consume permit if available, and continue rather than park 488 if (parkPermitGetAndSet(false) || interrupted) { 489 if (!stateCompareAndSet(ST_PARKING1, ST_RUNNING)) 490 throw new InternalError(); 491 return; 492 } 493 494 // attempt to yield 495 if (!stateCompareAndSet(ST_PARKING1, ST_PARKING2)) 496 throw new InternalError(); 497 498 Continuation.yield(FIBER_SCOPE); 499 500 // continued 501 assert stateGet() == ST_RUNNING; 502 } 503 504 /** 505 * Waits for this fiber to terminate. 506 * 507 * <p> If the current thread is interrupted while waiting then it will 508 * continue to wait. When the thread does return from this method then its 509 * interrupt status will be set. 510 * 511 * @throws IllegalStateException if the fiber has not started 512 * @return this fiber 513 */ 514 public Fiber await() { 515 boolean joinInterrupted = false; 516 boolean terminated = false; 517 while (!terminated) { 518 try { 519 terminated = joinNanos(0); 520 } catch (InterruptedException e) { 521 joinInterrupted = true; 522 } 523 } 524 if (joinInterrupted) 525 Thread.currentThread().interrupt(); 526 return this; 527 } 528 529 /** 530 * Waits for this fiber to terminate. This method does not wait if the time 531 * to wait is less than or equal to zero. 532 * 533 * <p> If the current thread is interrupted while waiting then it will 534 * continue to wait. When the thread does return from this method then its 535 * interrupt status will be set. 536 * 537 * @param nanos the maximum time to wait, in nanoseconds 538 * @throws IllegalStateException if the fiber has not started 539 * @return this fiber 540 */ 541 public Fiber awaitNanos(long nanos) { 542 if (stateGet() == ST_NEW) { 543 throw new IllegalStateException("fiber not started"); 544 } 545 if (nanos > 0) { 546 boolean joinInterrupted = false; 547 boolean terminated = false; 548 549 // wait until the fiber terminates or timeout elapses 550 while (!terminated && nanos > 0) { 551 long startTime = System.nanoTime(); 552 try { 553 terminated = joinNanos(nanos); 554 } catch (InterruptedException e) { 555 joinInterrupted = true; 556 } 557 nanos -= (System.nanoTime() - startTime); 558 } 559 560 // restore interrupt status 561 if (joinInterrupted) { 562 Thread.currentThread().interrupt(); 563 } 564 } 565 return this; 566 } 567 568 /** 569 * Waits up to {@code nanos} nanoseconds for this fiber to terminate. 570 * A timeout of {@code 0} means to wait forever. 571 * 572 * @throws InterruptedException if interrupted while waiting 573 * @throws IllegalArgumentException if nanos is negative 574 * @throws IllegalStateException if the fiber has not been started 575 * @return true if the fiber has terminated 576 */ 577 boolean joinNanos(long nanos) throws InterruptedException { 578 if (nanos < 0) { 579 throw new IllegalArgumentException(); 580 } 581 lock.lock(); 582 try { 583 short s = stateGet(); 584 if (s == ST_NEW) { 585 throw new IllegalStateException("fiber not started"); 586 } else if (s == ST_TERMINATED) { 587 // already terminated 588 return true; 589 } 590 if (nanos == 0) { 591 terminationCondition().await(); 592 } else { 593 terminationCondition().await(nanos, NANOSECONDS); 594 } 595 } finally { 596 lock.unlock(); 597 } 598 return (stateGet() == ST_TERMINATED); 599 } 600 601 @Override 602 public void interrupt() { 603 // synchronize with detachKernelThread to ensure the kernel 604 // thread is not interrupted after it has been detached 605 synchronized (lock) { 606 // set fiber interrupt status and close channel if fiber is 607 // blocked in an I/O operation on an InterruptibleChannel. 608 super.interrupt(); 609 610 // interrupt kernel thread 611 Thread t = kernelThread; 612 if (t != null) t.interrupt(); 613 } 614 unpark(); 615 } 616 617 @Override 618 public boolean isInterrupted() { 619 return interrupted; 620 } 621 622 @Override 623 void doInterrupt() { 624 assert Thread.holdsLock(lock); 625 interrupted = true; 626 } 627 628 /** 629 * Clears the interrupt status and returns the old value. If set, this 630 * method clears the fiber's interrupt status and the interrupt status of 631 * the kernel thread. 632 */ 633 @Override 634 boolean getAndClearInterrupt() { 635 assert Thread.currentThread() == this && kernelThread != null; 636 boolean oldValue = interrupted; 637 if (oldValue) { 638 synchronized (lock) { 639 interrupted = false; 640 kernelThread.getAndClearInterrupt(); 641 } 642 } 643 return oldValue; 644 } 645 646 @Override 647 public Thread.State getState() { 648 switch (stateGet()) { 649 case ST_NEW: 650 return State.NEW; 651 case ST_STARTED: 652 return State.RUNNABLE; 653 case ST_RUNNING: 654 Thread t = kernelThread; 655 if (t != null) { 656 return t.getState(); 657 } else { 658 return State.RUNNABLE; 659 } 660 case ST_PARKING1: 661 case ST_PARKING2: 662 return State.RUNNABLE; // not yet waiting 663 case ST_PARKED: 664 case ST_PINNED: 665 return State.WAITING; 666 case ST_TERMINATED: 667 return State.TERMINATED; 668 default: 669 throw new InternalError(); 670 } 671 } 672 673 @Override 674 public String toString() { 675 StringBuilder sb = new StringBuilder("Fiber["); 676 Thread t = kernelThread; 677 if (t != null) { 678 sb.append(t.getName()); 679 ThreadGroup g = t.getThreadGroup(); 680 if (g != null) { 681 sb.append(","); 682 sb.append(g.getName()); 683 } 684 } else { 685 sb.append("<no kernel thread>"); 686 } 687 sb.append("]"); 688 return sb.toString(); 689 } 690 691 /** 692 * Returns the Condition object for parking, creating it if needed. 693 */ 694 private Condition parkingCondition() { 695 assert lock.isHeldByCurrentThread(); 696 Condition parking = this.parking; 697 if (parking == null) { 698 this.parking = parking = lock.newCondition(); 699 } 700 return parking; 701 } 702 703 /** 704 * Returns the Condition object for termination, creating it if needed. 705 */ 706 private Condition terminationCondition() { 707 assert lock.isHeldByCurrentThread(); 708 Condition termination = this.termination; 709 if (termination == null) { 710 this.termination = termination = lock.newCondition(); 711 } 712 return termination; 713 } 714 715 // -- wrappers for VarHandle methods -- 716 717 private short stateGet() { 718 return (short) STATE.get(this); 719 } 720 721 private short stateGetAndSet(short newValue) { 722 return (short) STATE.getAndSet(this, newValue); 723 } 724 725 private boolean stateCompareAndSet(short expectedValue, short newValue) { 726 return STATE.compareAndSet(this, expectedValue, newValue); 727 } 728 729 private boolean parkPermitGetAndSet(boolean newValue) { 730 return (boolean) PARK_PERMIT.getAndSet(this, newValue); 731 } 732 733 /** 734 * The thread group for fibers. As Fibers are unstarted Threads, the 735 * group will be empty. 736 */ 737 private static ThreadGroup defaultThreadGroup() { 738 return AccessController.doPrivileged(new PrivilegedAction<>() { 739 public ThreadGroup run() { 740 ThreadGroup group = Thread.currentThread().getThreadGroup(); 741 for (ThreadGroup p; (p = group.getParent()) != null; ) 742 group = p; 743 return new ThreadGroup(group, "Fibers"); 744 }}); 745 } 746 747 /** 748 * Creates the default scheduler 749 * 750 * For now, it uses the ForkJoin's InnocuousForkJoinWorkerThreadFactory to 751 * avoid creating a duplicate ForkJoinWorkerThreadFactory implementation. 752 */ 753 private static Executor defaultScheduler() { 754 PrivilegedAction<Executor> pa = () -> { 755 int parallelism; 756 String s = System.getProperty("jdk.defaultScheduler.parallelism"); 757 if (s != null) { 758 parallelism = Integer.parseInt(s); 759 } else { 760 parallelism = Runtime.getRuntime().availableProcessors(); 761 } 762 ForkJoinWorkerThreadFactory factory; 763 try { 764 Class<?> clazz = Class.forName( 765 "java.util.concurrent.ForkJoinPool$InnocuousForkJoinWorkerThreadFactory"); 766 Constructor<?> ctor = clazz.getDeclaredConstructor(); 767 ctor.setAccessible(true); 768 factory = (ForkJoinWorkerThreadFactory) ctor.newInstance(); 769 } catch (Exception e) { 770 throw new InternalError(e); 771 } 772 Thread.UncaughtExceptionHandler ueh = (t, e) -> { }; 773 boolean asyncMode = Boolean.getBoolean("jdk.defaultScheduler.asyncMode"); 774 return new ForkJoinPool(parallelism, factory, ueh, asyncMode); 775 }; 776 return AccessController.doPrivileged(pa); 777 } 778 779 /** 780 * Creates the ScheduledThreadPoolExecutor used to schedule unparking. 781 */ 782 private static ScheduledExecutorService delayedTaskScheduler() { 783 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 784 Executors.newScheduledThreadPool(1, r -> 785 AccessController.doPrivileged(new PrivilegedAction<>() { 786 public Thread run() { 787 Thread t = new Thread(r); 788 t.setName("Unparker"); 789 t.setDaemon(true); 790 return t; 791 }})); 792 stpe.setRemoveOnCancelPolicy(true); 793 return stpe; 794 } 795 796 /** 797 * Returns the AccessControlContext to inherit when creating a Fiber. 798 * This method returns an innocuous AccessControlContext for the default 799 * scheduler. 800 */ 801 private static AccessControlContext threadACC(Executor scheduler) { 802 return (scheduler == DEFAULT_SCHEDULER) ? INNOCUOUS_ACC : null; 803 } 804 805 /** 806 * Return an AccessControlContext that doesn't support any permissions. 807 */ 808 private static AccessControlContext innocuousACC() { 809 return new AccessControlContext(new ProtectionDomain[] { 810 new ProtectionDomain(null, null) 811 }); 812 } 813 }