1 /* 2 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 #include <assert.h> 27 #include "java_net_SocketOptions.h" 28 #include "jdk_net_RdmaSocketOptions.h" 29 #include "jvm.h" 30 #include <netinet/tcp.h> // defines TCP_NODELAY 31 #include "net_util.h" 32 #include "rdma_util_md.h" 33 #include "Rsocket.h" 34 #include <stdlib.h> 35 #include <sys/ioctl.h> 36 #include <sys/resource.h> 37 #include <pthread.h> 38 39 #define BLOCKING_IO_RETURN_INT(FD, FUNC) { \ 40 int ret; \ 41 threadEntry_t self; \ 42 fdEntry_t *fdEntry = getFdEntry(FD); \ 43 if (fdEntry == NULL) { \ 44 errno = EBADF; \ 45 return -1; \ 46 } \ 47 do { \ 48 startOp(fdEntry, &self); \ 49 ret = FUNC; \ 50 endOp(fdEntry, &self); \ 51 } while (ret == -1 && errno == EINTR); \ 52 return ret; \ 53 } 54 55 typedef struct threadEntry { 56 pthread_t thr; /* this thread */ 57 struct threadEntry *next; /* next thread */ 58 int intr; /* interrupted */ 59 } threadEntry_t; 60 61 typedef struct { 62 pthread_mutex_t lock; /* fd lock */ 63 threadEntry_t *threads; /* threads blocked on fd */ 64 } fdEntry_t; 65 66 static int sigWakeup = (__SIGRTMAX - 2); 67 68 static fdEntry_t* fdTable = NULL; 69 70 static const int fdTableMaxSize = 0x1000; /* 4K */ 71 72 static int fdTableLen = 0; 73 74 static int fdLimit = 0; 75 76 static fdEntry_t** fdOverflowTable = NULL; 77 78 static int fdOverflowTableLen = 0; 79 80 static const int fdOverflowTableSlabSize = 0x10000; /* 64k */ 81 82 pthread_mutex_t fdOverflowTableLock = PTHREAD_MUTEX_INITIALIZER; 83 84 static void sig_wakeup(int sig) { 85 } 86 87 static void __attribute((constructor)) init() { 88 struct rlimit nbr_files; 89 sigset_t sigset; 90 struct sigaction sa; 91 int i = 0; 92 93 if (-1 == getrlimit(RLIMIT_NOFILE, &nbr_files)) { 94 fprintf(stderr, "library initialization failed - " 95 "unable to get max # of allocated fds\n"); 96 abort(); 97 } 98 if (nbr_files.rlim_max != RLIM_INFINITY) { 99 fdLimit = nbr_files.rlim_max; 100 } else { 101 fdLimit = INT_MAX; 102 } 103 104 fdTableLen = fdLimit < fdTableMaxSize ? fdLimit : fdTableMaxSize; 105 fdTable = (fdEntry_t*) calloc(fdTableLen, sizeof(fdEntry_t)); 106 if (fdTable == NULL) { 107 fprintf(stderr, "library initialization failed - " 108 "unable to allocate file descriptor table - out of memory"); 109 abort(); 110 } else { 111 for (i = 0; i < fdTableLen; i ++) { 112 pthread_mutex_init(&fdTable[i].lock, NULL); 113 } 114 } 115 116 if (fdLimit > fdTableMaxSize) { 117 fdOverflowTableLen = ((fdLimit - fdTableMaxSize) / fdOverflowTableSlabSize) + 1; 118 fdOverflowTable = (fdEntry_t**) calloc(fdOverflowTableLen, sizeof(fdEntry_t*)); 119 if (fdOverflowTable == NULL) { 120 fprintf(stderr, "library initialization failed - " 121 "unable to allocate file descriptor overflow table - out of memory"); 122 abort(); 123 } 124 } 125 126 sa.sa_handler = sig_wakeup; 127 sa.sa_flags = 0; 128 sigemptyset(&sa.sa_mask); 129 sigaction(sigWakeup, &sa, NULL); 130 131 sigemptyset(&sigset); 132 sigaddset(&sigset, sigWakeup); 133 sigprocmask(SIG_UNBLOCK, &sigset, NULL); 134 } 135 136 static inline fdEntry_t *getFdEntry(int fd) { 137 fdEntry_t* result = NULL; 138 139 if (fd < 0) { 140 return NULL; 141 } 142 143 assert(fd < fdLimit); 144 145 if (fd < fdTableMaxSize) { 146 assert(fd < fdTableLen); 147 result = &fdTable[fd]; 148 } else { 149 const int indexInOverflowTable = fd - fdTableMaxSize; 150 const int rootindex = indexInOverflowTable / fdOverflowTableSlabSize; 151 const int slabindex = indexInOverflowTable % fdOverflowTableSlabSize; 152 fdEntry_t* slab = NULL; 153 assert(rootindex < fdOverflowTableLen); 154 assert(slabindex < fdOverflowTableSlabSize); 155 pthread_mutex_lock(&fdOverflowTableLock); 156 if (fdOverflowTable[rootindex] == NULL) { 157 fdEntry_t* const newSlab = 158 (fdEntry_t*)calloc(fdOverflowTableSlabSize, sizeof(fdEntry_t)); 159 if (newSlab == NULL) { 160 fprintf(stderr, "Unable to allocate file descriptor overflow" 161 " table slab - out of memory"); 162 pthread_mutex_unlock(&fdOverflowTableLock); 163 abort(); 164 } else { 165 int i; 166 for (i = 0; i < fdOverflowTableSlabSize; i ++) { 167 pthread_mutex_init(&newSlab[i].lock, NULL); 168 } 169 fdOverflowTable[rootindex] = newSlab; 170 } 171 } 172 pthread_mutex_unlock(&fdOverflowTableLock); 173 slab = fdOverflowTable[rootindex]; 174 result = &slab[slabindex]; 175 } 176 return result; 177 } 178 179 static inline void startOp(fdEntry_t *fdEntry, threadEntry_t *self) { 180 self->thr = pthread_self(); 181 self->intr = 0; 182 183 pthread_mutex_lock(&(fdEntry->lock)); 184 { 185 self->next = fdEntry->threads; 186 fdEntry->threads = self; 187 } 188 pthread_mutex_unlock(&(fdEntry->lock)); 189 } 190 191 static inline void endOp (fdEntry_t *fdEntry, threadEntry_t *self) { 192 int orig_errno = errno; 193 pthread_mutex_lock(&(fdEntry->lock)); 194 { 195 threadEntry_t *curr, *prev=NULL; 196 curr = fdEntry->threads; 197 while (curr != NULL) { 198 if (curr == self) { 199 if (curr->intr) { 200 orig_errno = EBADF; 201 } 202 if (prev == NULL) { 203 fdEntry->threads = curr->next; 204 } else { 205 prev->next = curr->next; 206 } 207 break; 208 } 209 prev = curr; 210 curr = curr->next; 211 } 212 } 213 pthread_mutex_unlock(&(fdEntry->lock)); 214 errno = orig_errno; 215 } 216 217 #define RESTARTABLE(_cmd, _result) do { \ 218 do { \ 219 _result = _cmd; \ 220 } while((_result == -1) && (errno == EINTR)); \ 221 } while(0) 222 223 int rdma_supported() { 224 int one = 1; 225 int rv, s; 226 s = rs_socket(PF_INET, SOCK_STREAM, 0); 227 if (s < 0) { 228 return JNI_FALSE; 229 } 230 return JNI_TRUE; 231 } 232 233 int RDMA_SocketAvailable(int s, jint *pbytes) { 234 int result; 235 RESTARTABLE(ioctl(s, FIONREAD, pbytes), result); 236 return (result == -1) ? 0 : 1; 237 } 238 239 int RDMA_MapSocketOption(jint cmd, int *level, int *optname) { 240 static struct { 241 jint cmd; 242 int level; 243 int optname; 244 } const opts[] = { 245 { java_net_SocketOptions_TCP_NODELAY, IPPROTO_TCP, TCP_NODELAY }, 246 { java_net_SocketOptions_SO_SNDBUF, SOL_SOCKET, SO_SNDBUF }, 247 { java_net_SocketOptions_SO_RCVBUF, SOL_SOCKET, SO_RCVBUF }, 248 { java_net_SocketOptions_SO_REUSEADDR, SOL_SOCKET, SO_REUSEADDR }, 249 { jdk_net_RdmaSocketOptions_SQSIZE, SOL_RDMA, RDMA_SQSIZE }, 250 { jdk_net_RdmaSocketOptions_RQSIZE, SOL_RDMA, RDMA_RQSIZE }, 251 { jdk_net_RdmaSocketOptions_INLINE, SOL_RDMA, RDMA_INLINE }, 252 }; 253 int i; 254 for (i=0; i<(int)(sizeof(opts) / sizeof(opts[0])); i++) { 255 if (cmd == opts[i].cmd) { 256 *level = opts[i].level; 257 *optname = opts[i].optname; 258 return 0; 259 } 260 } 261 return -1; 262 } 263 264 int RDMA_GetSockOpt(int fd, int level, int opt, void *result, int *len) { 265 int rv; 266 socklen_t socklen = *len; 267 268 rv = rs_getsockopt(fd, level, opt, result, &socklen); 269 *len = socklen; 270 271 if (rv < 0) { 272 return rv; 273 } 274 275 if ((level == SOL_SOCKET) && ((opt == SO_SNDBUF) 276 || (opt == SO_RCVBUF))) { 277 int n = *((int *)result); 278 n /= 2; 279 *((int *)result) = n; 280 } 281 return rv; 282 } 283 284 int RDMA_SetSockOpt(int fd, int level, int opt, const void *arg, int len) { 285 int *bufsize; 286 if (level == SOL_SOCKET && opt == SO_RCVBUF) { 287 int *bufsize = (int *)arg; 288 if (*bufsize < 1024) { 289 *bufsize = 1024; 290 } 291 } 292 293 return rs_setsockopt(fd, level, opt, arg, len); 294 } 295 296 int RDMA_Bind(int fd, SOCKETADDRESS *sa, int len) { 297 int rv; 298 int arg, alen; 299 300 if (sa->sa.sa_family == AF_INET) { 301 if ((ntohl(sa->sa4.sin_addr.s_addr) & 0x7f0000ff) == 0x7f0000ff) { 302 errno = EADDRNOTAVAIL; 303 return -1; 304 } 305 } 306 rv = rs_bind(fd, &sa->sa, len); 307 return rv; 308 } 309 310 jint RDMA_Wait(JNIEnv *env, jint fd, jint flags, jint timeout) { 311 jlong prevNanoTime = JVM_NanoTime(env, 0); 312 jlong nanoTimeout = (jlong) timeout * NET_NSEC_PER_MSEC; 313 jint read_rv; 314 315 while (1) { 316 jlong newNanoTime; 317 struct pollfd pfd; 318 pfd.fd = fd; 319 pfd.events = 0; 320 if (flags & NET_WAIT_READ) 321 pfd.events |= POLLIN; 322 if (flags & NET_WAIT_WRITE) 323 pfd.events |= POLLOUT; 324 if (flags & NET_WAIT_CONNECT) 325 pfd.events |= POLLOUT; 326 327 errno = 0; 328 read_rv = RDMA_Poll(&pfd, 1, nanoTimeout / NET_NSEC_PER_MSEC); 329 330 newNanoTime = JVM_NanoTime(env, 0); 331 nanoTimeout -= (newNanoTime - prevNanoTime); 332 if (nanoTimeout < NET_NSEC_PER_MSEC) { 333 return read_rv > 0 ? 0 : -1; 334 } 335 prevNanoTime = newNanoTime; 336 337 if (read_rv > 0) { 338 break; 339 } 340 } 341 return (nanoTimeout / NET_NSEC_PER_MSEC); 342 } 343 344 static int rdma_closefd(int fd1, int fd2) { 345 int rv, orig_errno; 346 fdEntry_t *fdEntry = getFdEntry(fd2); 347 if (fdEntry == NULL) { 348 errno = EBADF; 349 return -1; 350 } 351 352 pthread_mutex_lock(&(fdEntry->lock)); 353 do { 354 if (fd1 < 0) { 355 rv = rs_close(fd2); 356 } 357 } while (rv == -1 && errno == EINTR); 358 359 threadEntry_t *curr = fdEntry->threads; 360 while (curr != NULL) { 361 curr->intr = 1; 362 pthread_kill( curr->thr, sigWakeup ); 363 curr = curr->next; 364 } 365 orig_errno = errno; 366 pthread_mutex_unlock(&(fdEntry->lock)); 367 errno = orig_errno; 368 return rv; 369 } 370 371 int RDMA_Dup2(int fd, int fd2) { 372 if (fd < 0) { 373 errno = EBADF; 374 return -1; 375 } 376 return rdma_closefd(fd, fd2); 377 } 378 379 int RDMA_SocketClose(int fd) { 380 return rdma_closefd(-1, fd); 381 } 382 383 int RDMA_Read(int s, void* buf, size_t len) { 384 BLOCKING_IO_RETURN_INT(s, rs_recv(s, buf, len, 0)); 385 } 386 387 int RDMA_NonBlockingRead(int s, void* buf, size_t len) { 388 BLOCKING_IO_RETURN_INT(s, rs_recv(s, buf, len, MSG_DONTWAIT)); 389 } 390 391 int RDMA_ReadV(int s, const struct iovec * vector, int count) { 392 BLOCKING_IO_RETURN_INT(s, rs_readv(s, vector, count) ); 393 } 394 395 int RDMA_RecvFrom(int s, void *buf, int len, unsigned int flags, 396 struct sockaddr *from, socklen_t *fromlen) { 397 BLOCKING_IO_RETURN_INT(s, rs_recvfrom(s, buf, len, flags, from, fromlen)); 398 } 399 400 int RDMA_Send(int s, void *msg, int len, unsigned int flags) { 401 BLOCKING_IO_RETURN_INT(s, rs_send(s, msg, len, flags)); 402 } 403 404 int RDMA_WriteV(int s, const struct iovec * vector, int count) { 405 BLOCKING_IO_RETURN_INT(s, rs_writev(s, vector, count)); 406 } 407 408 int NET_RSendTo(int s, const void *msg, int len, unsigned int 409 flags, const struct sockaddr *to, int tolen) { 410 BLOCKING_IO_RETURN_INT(s, rs_sendto(s, msg, len, flags, to, tolen)); 411 } 412 413 int RDMA_Accept(int s, struct sockaddr *addr, socklen_t *addrlen) { 414 BLOCKING_IO_RETURN_INT(s, rs_accept(s, addr, addrlen)); 415 } 416 417 int RDMA_Connect(int s, struct sockaddr *addr, int addrlen) { 418 BLOCKING_IO_RETURN_INT(s, rs_connect(s, addr, addrlen)); 419 } 420 421 int RDMA_Poll(struct pollfd *ufds, unsigned int nfds, int timeout) { 422 BLOCKING_IO_RETURN_INT(ufds[0].fd, rs_poll(ufds, nfds, timeout)); 423 } 424 425 int RDMA_Timeout(JNIEnv *env, int s, long timeout, jlong nanoTimeStamp) { 426 jlong prevNanoTime = nanoTimeStamp; 427 jlong nanoTimeout = (jlong)timeout * NET_NSEC_PER_MSEC; 428 fdEntry_t *fdEntry = getFdEntry(s); 429 430 if (fdEntry == NULL) { 431 errno = EBADF; 432 return -1; 433 } 434 435 for(;;) { 436 struct pollfd pfd; 437 int rv; 438 threadEntry_t self; 439 440 pfd.fd = s; 441 pfd.events = POLLIN | POLLERR; 442 443 startOp(fdEntry, &self); 444 rv = rs_poll(&pfd, 1, nanoTimeout / NET_NSEC_PER_MSEC); 445 endOp(fdEntry, &self); 446 if (rv < 0 && errno == EINTR) { 447 jlong newNanoTime = JVM_NanoTime(env, 0); 448 nanoTimeout -= newNanoTime - prevNanoTime; 449 if (nanoTimeout < NET_NSEC_PER_MSEC) { 450 return 0; 451 } 452 prevNanoTime = newNanoTime; 453 } else { 454 return rv; 455 } 456 } 457 }