44
45 import java.io.ByteArrayInputStream;
46 import java.io.ByteArrayOutputStream;
47 import java.io.ObjectInputStream;
48 import java.io.ObjectOutputStream;
49 import java.lang.invoke.MethodHandles;
50 import java.lang.invoke.VarHandle;
51 import java.util.ArrayList;
52 import java.util.Iterator;
53 import java.util.List;
54 import java.util.concurrent.LinkedTransferQueue;
55 import java.util.concurrent.ThreadLocalRandom;
56 import java.util.concurrent.TimeUnit;
57 import static java.util.stream.Collectors.toList;
58 import java.util.function.Consumer;
59
60 @Test
61 public class WhiteBox {
62 final ThreadLocalRandom rnd = ThreadLocalRandom.current();
63 final VarHandle HEAD, TAIL, ITEM, NEXT;
64 final int SWEEP_THRESHOLD;
65
66 public WhiteBox() throws ReflectiveOperationException {
67 Class<?> qClass = LinkedTransferQueue.class;
68 Class<?> nodeClass = Class.forName(qClass.getName() + "$Node");
69 MethodHandles.Lookup lookup
70 = MethodHandles.privateLookupIn(qClass, MethodHandles.lookup());
71 HEAD = lookup.findVarHandle(qClass, "head", nodeClass);
72 TAIL = lookup.findVarHandle(qClass, "tail", nodeClass);
73 NEXT = lookup.findVarHandle(nodeClass, "next", nodeClass);
74 ITEM = lookup.findVarHandle(nodeClass, "item", Object.class);
75 SWEEP_THRESHOLD = (int)
76 lookup.findStaticVarHandle(qClass, "SWEEP_THRESHOLD", int.class)
77 .get();
78 }
79
80 Object head(LinkedTransferQueue q) { return HEAD.getVolatile(q); }
81 Object tail(LinkedTransferQueue q) { return TAIL.getVolatile(q); }
82 Object item(Object node) { return ITEM.getVolatile(node); }
83 Object next(Object node) { return NEXT.getVolatile(node); }
84
85 int nodeCount(LinkedTransferQueue q) {
86 int i = 0;
87 for (Object p = head(q); p != null; ) {
88 i++;
89 if (p == (p = next(p))) p = head(q);
90 }
91 return i;
92 }
93
94 int tailCount(LinkedTransferQueue q) {
95 int i = 0;
96 for (Object p = tail(q); p != null; ) {
97 i++;
348 }
349
350 @SuppressWarnings("unchecked")
351 <T> T serialClone(T o) {
352 try {
353 ObjectInputStream ois = new ObjectInputStream
354 (new ByteArrayInputStream(serialBytes(o)));
355 T clone = (T) ois.readObject();
356 assertNotSame(o, clone);
357 assertSame(o.getClass(), clone.getClass());
358 return clone;
359 } catch (Exception fail) {
360 throw new AssertionError(fail);
361 }
362 }
363
364 @Test
365 public void testSerialization() {
366 LinkedTransferQueue q = serialClone(new LinkedTransferQueue());
367 assertInvariants(q);
368 }
369
370 public void cancelledNodeSweeping() throws Throwable {
371 assertEquals(SWEEP_THRESHOLD & (SWEEP_THRESHOLD - 1), 0);
372 LinkedTransferQueue q = new LinkedTransferQueue();
373 Thread blockHead = null;
374 if (rnd.nextBoolean()) {
375 blockHead = new Thread(
376 () -> { try { q.take(); } catch (InterruptedException ok) {}});
377 blockHead.start();
378 while (nodeCount(q) != 2) { Thread.yield(); }
379 assertTrue(q.hasWaitingConsumer());
380 assertEquals(q.getWaitingConsumerCount(), 1);
381 }
382 int initialNodeCount = nodeCount(q);
383
384 // Some dead nodes do in fact accumulate ...
385 if (blockHead != null)
386 while (nodeCount(q) < initialNodeCount + SWEEP_THRESHOLD / 2)
387 q.poll(1L, TimeUnit.MICROSECONDS);
388
389 // ... but no more than SWEEP_THRESHOLD nodes accumulate
390 for (int i = rnd.nextInt(SWEEP_THRESHOLD * 10); i-->0; )
391 q.poll(1L, TimeUnit.MICROSECONDS);
392 assertTrue(nodeCount(q) <= initialNodeCount + SWEEP_THRESHOLD);
393
394 if (blockHead != null) {
395 blockHead.interrupt();
396 blockHead.join();
397 }
398 }
399
400 /** Checks conditions which should always be true. */
401 void assertInvariants(LinkedTransferQueue q) {
402 assertNotNull(head(q));
403 assertNotNull(tail(q));
404 // head is never self-linked (but tail may!)
405 for (Object h; next(h = head(q)) == h; )
406 assertNotSame(h, head(q)); // must be update race
407 }
408 }
|
44
45 import java.io.ByteArrayInputStream;
46 import java.io.ByteArrayOutputStream;
47 import java.io.ObjectInputStream;
48 import java.io.ObjectOutputStream;
49 import java.lang.invoke.MethodHandles;
50 import java.lang.invoke.VarHandle;
51 import java.util.ArrayList;
52 import java.util.Iterator;
53 import java.util.List;
54 import java.util.concurrent.LinkedTransferQueue;
55 import java.util.concurrent.ThreadLocalRandom;
56 import java.util.concurrent.TimeUnit;
57 import static java.util.stream.Collectors.toList;
58 import java.util.function.Consumer;
59
60 @Test
61 public class WhiteBox {
62 final ThreadLocalRandom rnd = ThreadLocalRandom.current();
63 final VarHandle HEAD, TAIL, ITEM, NEXT;
64
65 public WhiteBox() throws ReflectiveOperationException {
66 Class<?> qClass = LinkedTransferQueue.class;
67 Class<?> nodeClass = Class.forName(qClass.getName() + "$Node");
68 MethodHandles.Lookup lookup
69 = MethodHandles.privateLookupIn(qClass, MethodHandles.lookup());
70 HEAD = lookup.findVarHandle(qClass, "head", nodeClass);
71 TAIL = lookup.findVarHandle(qClass, "tail", nodeClass);
72 NEXT = lookup.findVarHandle(nodeClass, "next", nodeClass);
73 ITEM = lookup.findVarHandle(nodeClass, "item", Object.class);
74 }
75
76 Object head(LinkedTransferQueue q) { return HEAD.getVolatile(q); }
77 Object tail(LinkedTransferQueue q) { return TAIL.getVolatile(q); }
78 Object item(Object node) { return ITEM.getVolatile(node); }
79 Object next(Object node) { return NEXT.getVolatile(node); }
80
81 int nodeCount(LinkedTransferQueue q) {
82 int i = 0;
83 for (Object p = head(q); p != null; ) {
84 i++;
85 if (p == (p = next(p))) p = head(q);
86 }
87 return i;
88 }
89
90 int tailCount(LinkedTransferQueue q) {
91 int i = 0;
92 for (Object p = tail(q); p != null; ) {
93 i++;
344 }
345
346 @SuppressWarnings("unchecked")
347 <T> T serialClone(T o) {
348 try {
349 ObjectInputStream ois = new ObjectInputStream
350 (new ByteArrayInputStream(serialBytes(o)));
351 T clone = (T) ois.readObject();
352 assertNotSame(o, clone);
353 assertSame(o.getClass(), clone.getClass());
354 return clone;
355 } catch (Exception fail) {
356 throw new AssertionError(fail);
357 }
358 }
359
360 @Test
361 public void testSerialization() {
362 LinkedTransferQueue q = serialClone(new LinkedTransferQueue());
363 assertInvariants(q);
364 }
365
366 /** Checks conditions which should always be true. */
367 void assertInvariants(LinkedTransferQueue q) {
368 assertNotNull(head(q));
369 assertNotNull(tail(q));
370 // head is never self-linked (but tail may!)
371 for (Object h; next(h = head(q)) == h; )
372 assertNotSame(h, head(q)); // must be update race
373 }
374 }
|