1 /* 2 * Copyright (c) 1997, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 #include <dlfcn.h> 26 #include <errno.h> 27 #include <net/if.h> 28 #include <netinet/tcp.h> // defines TCP_NODELAY 29 #include <stdlib.h> 30 #include <string.h> 31 #include <sys/ioctl.h> 32 #include <sys/time.h> 33 34 #include <arpa/inet.h> 35 #include <net/route.h> 36 #include <sys/utsname.h> 37 38 #include "jvm.h" 39 #include "rdma_util_md.h" 40 41 #include "java_net_SocketOptions.h" 42 #include "jdk_net_RdmaSocketOptions.h" 43 #include "java_net_InetAddress.h" 44 #include <assert.h> 45 #include <limits.h> 46 #include <stdio.h> 47 #include <stdlib.h> 48 #include <signal.h> 49 #include <pthread.h> 50 #include <sys/types.h> 51 #include <sys/socket.h> 52 #include <rdma/rsocket.h> 53 #include <sys/time.h> 54 #include <sys/resource.h> 55 #include <sys/uio.h> 56 #include <unistd.h> 57 #include <errno.h> 58 #include <poll.h> 59 #include "net_util.h" 60 61 #define BLOCKING_IO_RETURN_INT(FD, FUNC) { \ 62 int ret; \ 63 threadEntry_t self; \ 64 fdEntry_t *fdEntry = getFdEntry(FD); \ 65 if (fdEntry == NULL) { \ 66 errno = EBADF; \ 67 return -1; \ 68 } \ 69 do { \ 70 startOp(fdEntry, &self); \ 71 ret = FUNC; \ 72 endOp(fdEntry, &self); \ 73 } while (ret == -1 && errno == EINTR); \ 74 return ret; \ 75 } 76 77 typedef struct threadEntry { 78 pthread_t thr; /* this thread */ 79 struct threadEntry *next; /* next thread */ 80 int intr; /* interrupted */ 81 } threadEntry_t; 82 83 typedef struct { 84 pthread_mutex_t lock; /* fd lock */ 85 threadEntry_t *threads; /* threads blocked on fd */ 86 } fdEntry_t; 87 88 static int sigWakeup = (__SIGRTMAX - 2); 89 90 static fdEntry_t* fdTable = NULL; 91 92 static const int fdTableMaxSize = 0x1000; /* 4K */ 93 94 static int fdTableLen = 0; 95 96 static int fdLimit = 0; 97 98 static fdEntry_t** fdOverflowTable = NULL; 99 100 static int fdOverflowTableLen = 0; 101 102 static const int fdOverflowTableSlabSize = 0x10000; /* 64k */ 103 104 pthread_mutex_t fdOverflowTableLock = PTHREAD_MUTEX_INITIALIZER; 105 106 static void sig_wakeup(int sig) { 107 } 108 109 110 static void __attribute((constructor)) init() { 111 struct rlimit nbr_files; 112 sigset_t sigset; 113 struct sigaction sa; 114 int i = 0; 115 116 if (-1 == getrlimit(RLIMIT_NOFILE, &nbr_files)) { 117 fprintf(stderr, "library initialization failed - " 118 "unable to get max # of allocated fds\n"); 119 abort(); 120 } 121 if (nbr_files.rlim_max != RLIM_INFINITY) { 122 fdLimit = nbr_files.rlim_max; 123 } else { 124 fdLimit = INT_MAX; 125 } 126 127 fdTableLen = fdLimit < fdTableMaxSize ? fdLimit : fdTableMaxSize; 128 fdTable = (fdEntry_t*) calloc(fdTableLen, sizeof(fdEntry_t)); 129 if (fdTable == NULL) { 130 fprintf(stderr, "library initialization failed - " 131 "unable to allocate file descriptor table - out of memory"); 132 abort(); 133 } else { 134 for (i = 0; i < fdTableLen; i ++) { 135 pthread_mutex_init(&fdTable[i].lock, NULL); 136 } 137 } 138 139 if (fdLimit > fdTableMaxSize) { 140 fdOverflowTableLen = ((fdLimit - fdTableMaxSize) / fdOverflowTableSlabSize) + 1; 141 fdOverflowTable = (fdEntry_t**) calloc(fdOverflowTableLen, sizeof(fdEntry_t*)); 142 if (fdOverflowTable == NULL) { 143 fprintf(stderr, "library initialization failed - " 144 "unable to allocate file descriptor overflow table - out of memory"); 145 abort(); 146 } 147 } 148 149 sa.sa_handler = sig_wakeup; 150 sa.sa_flags = 0; 151 sigemptyset(&sa.sa_mask); 152 sigaction(sigWakeup, &sa, NULL); 153 154 sigemptyset(&sigset); 155 sigaddset(&sigset, sigWakeup); 156 sigprocmask(SIG_UNBLOCK, &sigset, NULL); 157 } 158 159 static inline fdEntry_t *getFdEntry(int fd) 160 { 161 fdEntry_t* result = NULL; 162 163 if (fd < 0) { 164 return NULL; 165 } 166 167 assert(fd < fdLimit); 168 169 if (fd < fdTableMaxSize) { 170 assert(fd < fdTableLen); 171 result = &fdTable[fd]; 172 } else { 173 const int indexInOverflowTable = fd - fdTableMaxSize; 174 const int rootindex = indexInOverflowTable / fdOverflowTableSlabSize; 175 const int slabindex = indexInOverflowTable % fdOverflowTableSlabSize; 176 fdEntry_t* slab = NULL; 177 assert(rootindex < fdOverflowTableLen); 178 assert(slabindex < fdOverflowTableSlabSize); 179 pthread_mutex_lock(&fdOverflowTableLock); 180 if (fdOverflowTable[rootindex] == NULL) { 181 fdEntry_t* const newSlab = 182 (fdEntry_t*)calloc(fdOverflowTableSlabSize, sizeof(fdEntry_t)); 183 if (newSlab == NULL) { 184 fprintf(stderr, "Unable to allocate file descriptor overflow" 185 " table slab - out of memory"); 186 pthread_mutex_unlock(&fdOverflowTableLock); 187 abort(); 188 } else { 189 int i; 190 for (i = 0; i < fdOverflowTableSlabSize; i ++) { 191 pthread_mutex_init(&newSlab[i].lock, NULL); 192 } 193 fdOverflowTable[rootindex] = newSlab; 194 } 195 } 196 pthread_mutex_unlock(&fdOverflowTableLock); 197 slab = fdOverflowTable[rootindex]; 198 result = &slab[slabindex]; 199 } 200 201 return result; 202 203 } 204 205 static inline void startOp(fdEntry_t *fdEntry, threadEntry_t *self) 206 { 207 self->thr = pthread_self(); 208 self->intr = 0; 209 210 pthread_mutex_lock(&(fdEntry->lock)); 211 { 212 self->next = fdEntry->threads; 213 fdEntry->threads = self; 214 } 215 pthread_mutex_unlock(&(fdEntry->lock)); 216 } 217 218 static inline void endOp 219 (fdEntry_t *fdEntry, threadEntry_t *self) 220 { 221 int orig_errno = errno; 222 pthread_mutex_lock(&(fdEntry->lock)); 223 { 224 threadEntry_t *curr, *prev=NULL; 225 curr = fdEntry->threads; 226 while (curr != NULL) { 227 if (curr == self) { 228 if (curr->intr) { 229 orig_errno = EBADF; 230 } 231 if (prev == NULL) { 232 fdEntry->threads = curr->next; 233 } else { 234 prev->next = curr->next; 235 } 236 break; 237 } 238 prev = curr; 239 curr = curr->next; 240 } 241 } 242 pthread_mutex_unlock(&(fdEntry->lock)); 243 errno = orig_errno; 244 } 245 246 #define RESTARTABLE(_cmd, _result) do { \ 247 do { \ 248 _result = _cmd; \ 249 } while((_result == -1) && (errno == EINTR)); \ 250 } while(0) 251 252 int rdma_supported() 253 { 254 int one = 1; 255 int rv, s; 256 s = socket(PF_INET, SOCK_STREAM, 0); 257 if (s < 0) { 258 return JNI_FALSE; 259 } 260 return JNI_TRUE; 261 } 262 263 int RDMA_SocketAvailable(int s, jint *pbytes) { 264 int result; 265 RESTARTABLE(ioctl(s, FIONREAD, pbytes), result); 266 return (result == -1) ? 0 : 1; 267 } 268 269 int 270 RDMA_MapSocketOption(jint cmd, int *level, int *optname) { 271 static struct { 272 jint cmd; 273 int level; 274 int optname; 275 } const opts[] = { 276 { java_net_SocketOptions_TCP_NODELAY, IPPROTO_TCP, TCP_NODELAY }, 277 { java_net_SocketOptions_SO_LINGER, SOL_SOCKET, SO_LINGER }, 278 { java_net_SocketOptions_SO_SNDBUF, SOL_SOCKET, SO_SNDBUF }, 279 { java_net_SocketOptions_SO_RCVBUF, SOL_SOCKET, SO_RCVBUF }, 280 { java_net_SocketOptions_SO_REUSEADDR, SOL_SOCKET, SO_REUSEADDR }, 281 { jdk_net_RdmaSocketOptions_SQSIZE, SOL_RDMA, RDMA_SQSIZE }, 282 { jdk_net_RdmaSocketOptions_RQSIZE, SOL_RDMA, RDMA_RQSIZE }, 283 { jdk_net_RdmaSocketOptions_INLINE, SOL_RDMA, RDMA_INLINE }, 284 }; 285 int i; 286 for (i=0; i<(int)(sizeof(opts) / sizeof(opts[0])); i++) { 287 if (cmd == opts[i].cmd) { 288 *level = opts[i].level; 289 *optname = opts[i].optname; 290 return 0; 291 } 292 } 293 294 return -1; 295 } 296 297 int 298 RDMA_GetSockOpt(int fd, int level, int opt, void *result, 299 int *len) 300 { 301 int rv; 302 socklen_t socklen = *len; 303 304 rv = rgetsockopt(fd, level, opt, result, &socklen); 305 *len = socklen; 306 307 if (rv < 0) { 308 return rv; 309 } 310 311 if ((level == SOL_SOCKET) && ((opt == SO_SNDBUF) 312 || (opt == SO_RCVBUF))) { 313 int n = *((int *)result); 314 n /= 2; 315 *((int *)result) = n; 316 } 317 return rv; 318 } 319 320 int 321 RDMA_SetSockOpt(int fd, int level, int opt, const void *arg, 322 int len) 323 { 324 int *bufsize; 325 if (level == SOL_SOCKET && opt == SO_RCVBUF) { 326 int *bufsize = (int *)arg; 327 if (*bufsize < 1024) { 328 *bufsize = 1024; 329 } 330 } 331 332 return rsetsockopt(fd, level, opt, arg, len); 333 } 334 335 int 336 RDMA_Bind(int fd, SOCKETADDRESS *sa, int len) 337 { 338 int rv; 339 int arg, alen; 340 341 if (sa->sa.sa_family == AF_INET) { 342 if ((ntohl(sa->sa4.sin_addr.s_addr) & 0x7f0000ff) == 0x7f0000ff) { 343 errno = EADDRNOTAVAIL; 344 return -1; 345 } 346 } 347 rv = rbind(fd, &sa->sa, len); 348 return rv; 349 } 350 351 jint 352 RDMA_Wait(JNIEnv *env, jint fd, jint flags, jint timeout) 353 { 354 jlong prevNanoTime = JVM_NanoTime(env, 0); 355 jlong nanoTimeout = (jlong) timeout * NET_NSEC_PER_MSEC; 356 jint read_rv; 357 358 while (1) { 359 jlong newNanoTime; 360 struct pollfd pfd; 361 pfd.fd = fd; 362 pfd.events = 0; 363 if (flags & NET_WAIT_READ) 364 pfd.events |= POLLIN; 365 if (flags & NET_WAIT_WRITE) 366 pfd.events |= POLLOUT; 367 if (flags & NET_WAIT_CONNECT) 368 pfd.events |= POLLOUT; 369 370 errno = 0; 371 read_rv = RDMA_Poll(&pfd, 1, nanoTimeout / NET_NSEC_PER_MSEC); 372 373 newNanoTime = JVM_NanoTime(env, 0); 374 nanoTimeout -= (newNanoTime - prevNanoTime); 375 if (nanoTimeout < NET_NSEC_PER_MSEC) { 376 return read_rv > 0 ? 0 : -1; 377 } 378 prevNanoTime = newNanoTime; 379 380 if (read_rv > 0) { 381 break; 382 } 383 } 384 return (nanoTimeout / NET_NSEC_PER_MSEC); 385 } 386 387 static int rdma_closefd(int fd1, int fd2) { 388 int rv, orig_errno; 389 fdEntry_t *fdEntry = getFdEntry(fd2); 390 if (fdEntry == NULL) { 391 errno = EBADF; 392 return -1; 393 } 394 395 pthread_mutex_lock(&(fdEntry->lock)); 396 397 { 398 do { 399 if (fd1 < 0) { 400 rv = rclose(fd2); 401 } else { 402 // rv = dup2(fd1, fd2); 403 } 404 } while (rv == -1 && errno == EINTR); 405 406 threadEntry_t *curr = fdEntry->threads; 407 while (curr != NULL) { 408 curr->intr = 1; 409 pthread_kill( curr->thr, sigWakeup ); 410 curr = curr->next; 411 } 412 } 413 414 orig_errno = errno; 415 pthread_mutex_unlock(&(fdEntry->lock)); 416 errno = orig_errno; 417 418 return rv; 419 } 420 421 int RDMA_Dup2(int fd, int fd2) { 422 if (fd < 0) { 423 errno = EBADF; 424 return -1; 425 } 426 return rdma_closefd(fd, fd2); 427 } 428 429 int RDMA_SocketClose(int fd) { 430 return rdma_closefd(-1, fd); 431 } 432 433 int RDMA_Read(int s, void* buf, size_t len) { 434 BLOCKING_IO_RETURN_INT( s, rrecv(s, buf, len, 0) ); 435 } 436 437 int RDMA_NonBlockingRead(int s, void* buf, size_t len) { 438 BLOCKING_IO_RETURN_INT( s, rrecv(s, buf, len, MSG_DONTWAIT) ); 439 } 440 441 int RDMA_ReadV(int s, const struct iovec * vector, int count) { 442 BLOCKING_IO_RETURN_INT( s, rreadv(s, vector, count) ); 443 } 444 445 int RDMA_RecvFrom(int s, void *buf, int len, unsigned int flags, 446 struct sockaddr *from, socklen_t *fromlen) { 447 BLOCKING_IO_RETURN_INT( s, rrecvfrom(s, buf, len, flags, from, fromlen) ); 448 } 449 450 int RDMA_Send(int s, void *msg, int len, unsigned int flags) { 451 BLOCKING_IO_RETURN_INT( s, rsend(s, msg, len, flags) ); 452 } 453 454 int RDMA_WriteV(int s, const struct iovec * vector, int count) { 455 BLOCKING_IO_RETURN_INT( s, rwritev(s, vector, count) ); 456 } 457 458 int NET_RSendTo(int s, const void *msg, int len, unsigned int 459 flags, const struct sockaddr *to, int tolen) { 460 BLOCKING_IO_RETURN_INT( s, rsendto(s, msg, len, flags, to, tolen) ); 461 } 462 463 int RDMA_Accept(int s, struct sockaddr *addr, socklen_t *addrlen) { 464 BLOCKING_IO_RETURN_INT( s, raccept(s, addr, addrlen) ); 465 } 466 467 int RDMA_Connect(int s, struct sockaddr *addr, int addrlen) { 468 BLOCKING_IO_RETURN_INT( s, rconnect(s, addr, addrlen) ); 469 } 470 471 int RDMA_Poll(struct pollfd *ufds, unsigned int nfds, int timeout) { 472 BLOCKING_IO_RETURN_INT( ufds[0].fd, rpoll(ufds, nfds, timeout) ); 473 } 474 475 int RDMA_Timeout(JNIEnv *env, int s, long timeout, jlong nanoTimeStamp) { 476 jlong prevNanoTime = nanoTimeStamp; 477 jlong nanoTimeout = (jlong)timeout * NET_NSEC_PER_MSEC; 478 fdEntry_t *fdEntry = getFdEntry(s); 479 480 if (fdEntry == NULL) { 481 errno = EBADF; 482 return -1; 483 } 484 485 for(;;) { 486 struct pollfd pfd; 487 int rv; 488 threadEntry_t self; 489 490 pfd.fd = s; 491 pfd.events = POLLIN | POLLERR; 492 493 startOp(fdEntry, &self); 494 rv = rpoll(&pfd, 1, nanoTimeout / NET_NSEC_PER_MSEC); 495 endOp(fdEntry, &self); 496 if (rv < 0 && errno == EINTR) { 497 jlong newNanoTime = JVM_NanoTime(env, 0); 498 nanoTimeout -= newNanoTime - prevNanoTime; 499 if (nanoTimeout < NET_NSEC_PER_MSEC) { 500 return 0; 501 } 502 prevNanoTime = newNanoTime; 503 } else { 504 return rv; 505 } 506 } 507 }