1 /*
   2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /* @test
  25  * @bug 8195160
  26  * @summary Spurious NPE from RdmaSocket.getIn/OutputStream
  27  * @requires (os.family == "linux")
  28  * @library .. /test/lib
  29  * @build RsocketTest
  30  * @run main/othervm -Djava.net.preferIPv4Stack=true Streams
  31  */
  32 
  33 import java.io.IOException;
  34 import java.lang.reflect.Constructor;
  35 import java.net.ServerSocket;
  36 import java.net.Socket;
  37 import java.net.InetAddress;
  38 import java.net.InetSocketAddress;
  39 import java.util.concurrent.Phaser;
  40 import jdk.net.Sockets;
  41 
  42 // Racey test, will not always fail, but if it does then there is a problem.
  43 
  44 public class Streams {
  45     static final int NUM_THREADS = 10;
  46     static volatile boolean failed;
  47     static ServerSocket ss;
  48     static final Phaser startingGate = new Phaser(NUM_THREADS + 1);
  49     static InetAddress ia;
  50 
  51     public static void main(String[] args) throws Exception {
  52         if (!RsocketTest.isRsocketAvailable())
  53             return;
  54 
  55         InetAddress ia = InetAddress.getLocalHost();
  56 
  57         try {
  58             ss = Sockets.openRdmaServerSocket();
  59             ss.bind(new InetSocketAddress(ia, 0));
  60             runTest(OutputStreamGetter.class);
  61             runTest(InputStreamGetter.class);
  62         } catch (Exception e) {
  63             e.printStackTrace();
  64             failed = true;
  65         } 
  66 
  67         if (failed)
  68             throw new RuntimeException("Failed, check output");
  69     }
  70 
  71     static void runTest(Class<? extends StreamGetter> klass)
  72         throws Exception
  73     {
  74         final int port = ss.getLocalPort();
  75         Socket[] sockets = new Socket[NUM_THREADS];
  76 
  77         for (int i = 0; i < NUM_THREADS; i++) {
  78             sockets[i] = Sockets.openRdmaSocket();
  79             final Socket cs = sockets[i];
  80             Thread t = new Thread() {
  81                 public void run() {
  82                     try {
  83                         cs.connect(new InetSocketAddress(InetAddress.getLocalHost(), port));
  84                     } catch (Exception e) {
  85                         e.printStackTrace();
  86                         failed = true;
  87                     }
  88                 }
  89             };
  90             t.start();
  91             Socket socket = ss.accept();
  92             cs.close();
  93         }
  94 
  95         Constructor<? extends StreamGetter> ctr = klass.getConstructor(Socket.class);
  96 
  97         Thread[] threads = new Thread[NUM_THREADS];
  98         for (int i=0; i<NUM_THREADS; i++)
  99             threads[i] = ctr.newInstance(sockets[i]);
 100         for (int i=0; i<NUM_THREADS; i++)
 101             threads[i].start();
 102 
 103         startingGate.arriveAndAwaitAdvance();
 104         for (int i=0; i<NUM_THREADS; i++)
 105             sockets[i].close();
 106 
 107         for (int i=0; i<NUM_THREADS; i++)
 108             threads[i].join();
 109     }
 110 
 111     static abstract class StreamGetter extends Thread {
 112         final Socket socket;
 113         StreamGetter(Socket s) { socket = s; }
 114 
 115         @Override
 116         public void run() {
 117             try {
 118                 startingGate.arriveAndAwaitAdvance();
 119                 getStream();
 120             } catch (IOException x) {
 121                 // OK, socket may be closed
 122             } catch (NullPointerException x) {
 123                 x.printStackTrace();
 124                 failed = true;
 125             }
 126         }
 127 
 128         abstract void getStream() throws IOException;
 129     }
 130 
 131     static class InputStreamGetter extends StreamGetter {
 132         public InputStreamGetter(Socket s) { super(s); }
 133         void getStream() throws IOException {
 134             socket.getInputStream();
 135         }
 136     }
 137 
 138     static class OutputStreamGetter extends StreamGetter {
 139         public OutputStreamGetter(Socket s) { super(s); }
 140         void getStream() throws IOException {
 141             socket.getOutputStream();
 142         }
 143     }
 144 }