< prev index next >

src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipFileSystem.java

Print this page
rev 54573 : 8222532: (zipfs) Performance regression when writing ZipFileSystem entries in parallel
Reviewed-by: TBD

@@ -33,12 +33,14 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
+import java.nio.channels.Channels;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
+import java.nio.channels.NonWritableChannelException;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SeekableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.nio.file.*;
 import java.nio.file.attribute.FileAttribute;

@@ -597,15 +599,15 @@
         }
         if (options.contains(APPEND) && options.contains(TRUNCATE_EXISTING))
             throw new IllegalArgumentException("APPEND + TRUNCATE_EXISTING not allowed");
     }
 
-
     // Returns an output SeekableByteChannel for either
     // (1) writing the contents of a new entry, if the entry doesn't exit, or
     // (2) updating/replacing the contents of an existing entry.
-    // Note: The content is not compressed.
+    // Note: The content of the channel is not compressed until the
+    // channel is closed
     private class EntryOutputChannel extends ByteArrayChannel {
         Entry e;
 
         EntryOutputChannel(Entry e) throws IOException {
             super(e.size > 0? (int)e.size : 8192, false);

@@ -620,23 +622,22 @@
                 e.flag |= FLAG_USE_UTF8;
         }
 
         @Override
         public void close() throws IOException {
-            e.bytes = toByteArray();
-            e.size = e.bytes.length;
-            e.crc = -1;
+            OutputStream os = getOutputStream(e);
+            os.write(toByteArray());
+            os.close(); // will update the entry
             super.close();
-            update(e);
         }
     }
 
     private int getCompressMethod(FileAttribute<?>... attrs) {
          return defaultMethod;
     }
 
-    // Returns a Writable/ReadByteChannel for now. Might consdier to use
+    // Returns a Writable/ReadByteChannel for now. Might consider to use
     // newFileChannel() instead, which dump the entry data into a regular
     // file on the default file system and create a FileChannel on top of
     // it.
     SeekableByteChannel newByteChannel(byte[] path,
                                        Set<? extends OpenOption> options,

@@ -645,14 +646,13 @@
     {
         checkOptions(options);
         if (options.contains(StandardOpenOption.WRITE) ||
             options.contains(StandardOpenOption.APPEND)) {
             checkWritable();
-            beginRead();    // only need a readlock, the "update()" will obtain
-                            // thewritelock when the channel is closed
+            beginRead();    // only need a read lock, the "update()" will obtain
+                            // the write lock when the channel is closed
             try {
-                ensureOpen();
                 Entry e = getEntry(path);
                 if (e != null) {
                     if (e.isDir() || options.contains(CREATE_NEW))
                         throw new FileAlreadyExistsException(getString(path));
                     SeekableByteChannel sbc =

@@ -674,11 +674,10 @@
                 if (!options.contains(CREATE) && !options.contains(CREATE_NEW))
                     throw new NoSuchFileException(getString(path));
                 checkParents(path);
                 return new EntryOutputChannel(
                     new Entry(path, Entry.NEW, false, getCompressMethod(attrs)));
-
             } finally {
                 endRead();
             }
         } else {
             beginRead();

@@ -741,11 +740,11 @@
                                            .provider()
                                            .newFileChannel(tmpfile, options, attrs);
             final Entry u = isFCH ? e : new Entry(path, tmpfile, Entry.FILECH);
             if (forWrite) {
                 u.flag = FLAG_DATADESCR;
-                u.method = getCompressMethod(attrs);
+                u.method = getCompressMethod();
             }
             // is there a better way to hook into the FileChannel's close method?
             return new FileChannel() {
                 public int write(ByteBuffer src) throws IOException {
                     return fch.write(src);

@@ -842,11 +841,15 @@
         }
     }
 
     // the outstanding input streams that need to be closed
     private Set<InputStream> streams =
-        Collections.synchronizedSet(new HashSet<InputStream>());
+        Collections.synchronizedSet(new HashSet<>());
+
+    // the ex-channel and ex-path that need to close when their outstanding
+    // input streams are all closed by the obtainers.
+    private Set<ExChannelCloser> exChClosers = new HashSet<>();
 
     private Set<Path> tmppaths = Collections.synchronizedSet(new HashSet<Path>());
     private Path getTempPathForEntry(byte[] path) throws IOException {
         Path tmpPath = createTempFileInSameDirectoryAs(zfpath);
         if (path != null) {

@@ -1200,50 +1203,67 @@
             locoff += n;
         }
         return written;
     }
 
-    private long writeEntry(Entry e, OutputStream os, byte[] buf)
+    private long writeEntry(Entry e, OutputStream os)
         throws IOException {
 
         if (e.bytes == null && e.file == null)    // dir, 0-length data
             return 0;
 
         long written = 0;
-        try (OutputStream os2 = e.method == METHOD_STORED ?
+        if (e.crc != 0 && e.csize > 0) {
+            // pre-compressed entry, write directly to output stream
+            writeTo(e, os);
+        } else {
+            try (OutputStream os2 = (e.method == METHOD_STORED) ?
             new EntryOutputStreamCRC32(e, os) : new EntryOutputStreamDef(e, os)) {
-            if (e.bytes != null) {                 // in-memory
-                os2.write(e.bytes, 0, e.bytes.length);
-            } else if (e.file != null) {           // tmp file
-                if (e.type == Entry.NEW || e.type == Entry.FILECH) {
-                    try (InputStream is = Files.newInputStream(e.file)) {
-                        is.transferTo(os2);
-                    }
-                }
-                Files.delete(e.file);
-                tmppaths.remove(e.file);
+                writeTo(e, os2);
             }
         }
         written += e.csize;
         if ((e.flag & FLAG_DATADESCR) != 0) {
             written += e.writeEXT(os);
         }
         return written;
     }
 
+    private void writeTo(Entry e, OutputStream os) throws IOException {
+        if (e.bytes != null) {
+            os.write(e.bytes, 0, e.bytes.length);
+        } else if (e.file != null) {
+            if (e.type == Entry.NEW || e.type == Entry.FILECH) {
+                try (InputStream is = Files.newInputStream(e.file)) {
+                    is.transferTo(os);
+                }
+            }
+            Files.delete(e.file);
+            tmppaths.remove(e.file);
+        }
+    }
+
     // sync the zip file system, if there is any udpate
     private void sync() throws IOException {
-
+        // check ex-closer
+        if (!exChClosers.isEmpty()) {
+            for (ExChannelCloser ecc : exChClosers) {
+                if (ecc.streams.isEmpty()) {
+                    ecc.ch.close();
+                    Files.delete(ecc.path);
+                    exChClosers.remove(ecc);
+                }
+            }
+        }
         if (!hasUpdate)
             return;
         Path tmpFile = createTempFileInSameDirectoryAs(zfpath);
-        try (OutputStream os = new BufferedOutputStream(Files.newOutputStream(tmpFile, WRITE)))
-        {
+        try (OutputStream os = new BufferedOutputStream(Files.newOutputStream(tmpFile, WRITE))) {
             ArrayList<Entry> elist = new ArrayList<>(inodes.size());
             long written = 0;
-            byte[] buf = new byte[8192];
-            Entry e = null;
+            byte[] buf = null;
+            Entry e;
 
             // write loc
             for (IndexNode inode : inodes.values()) {
                 if (inode instanceof Entry) {    // an updated inode
                     e = (Entry)inode;

@@ -1252,15 +1272,17 @@
                             // entry copy: the only thing changed is the "name"
                             // and "nlen" in LOC header, so we udpate/rewrite the
                             // LOC in new file and simply copy the rest (data and
                             // ext) without enflating/deflating from the old zip
                             // file LOC entry.
+                            if (buf == null)
+                                buf = new byte[8192];
                             written += copyLOCEntry(e, true, os, written, buf);
                         } else {                          // NEW, FILECH or CEN
                             e.locoff = written;
                             written += e.writeLOC(os);    // write loc header
-                            written += writeEntry(e, os, buf);
+                            written += writeEntry(e, os);
                         }
                         elist.add(e);
                     } catch (IOException x) {
                         x.printStackTrace();    // skip any in-accurate entry
                     }

@@ -1272,10 +1294,12 @@
                         continue;               // no root '/' directory even it
                                                 // exits in original zip/jar file.
                     }
                     e = Entry.readCEN(this, inode);
                     try {
+                        if (buf == null)
+                            buf = new byte[8192];
                         written += copyLOCEntry(e, false, os, written, buf);
                         elist.add(e);
                     } catch (IOException x) {
                         x.printStackTrace();    // skip any wrong entry
                     }

@@ -1289,13 +1313,27 @@
             }
             end.centot = elist.size();
             end.cenlen = written - end.cenoff;
             end.write(os, written, forceEnd64);
         }
-
+        if (!streams.isEmpty()) {
+            //
+            // There are outstanding input streams open on existing "ch",
+            // so, don't close the "cha" and delete the "file for now, let
+            // the "ex-channel-closer" to handle them
+            ExChannelCloser ecc = new ExChannelCloser(
+                                      createTempFileInSameDirectoryAs(zfpath),
+                                      ch,
+                                      streams);
+            Files.move(zfpath, ecc.path, REPLACE_EXISTING);
+            exChClosers.add(ecc);
+            streams = Collections.synchronizedSet(new HashSet<InputStream>());
+        } else {
         ch.close();
         Files.delete(zfpath);
+        }
+
         Files.move(tmpFile, zfpath, REPLACE_EXISTING);
         hasUpdate = false;    // clear
     }
 
     IndexNode getInode(byte[] path) {

@@ -1349,15 +1387,19 @@
             e.file = getTempPathForEntry(null);
             os = Files.newOutputStream(e.file, WRITE);
         } else {
             os = new ByteArrayOutputStream((e.size > 0)? (int)e.size : 8192);
         }
+        if (e.method == METHOD_DEFLATED) {
+            return new DeflatingEntryOutputStream(e, os);
+        } else {
         return new EntryOutputStream(e, os);
     }
+    }
 
     private class EntryOutputStream extends FilterOutputStream {
-        private Entry e;
+        private final Entry e;
         private long written;
         private boolean isClosed;
 
         EntryOutputStream(Entry e, OutputStream os) throws IOException {
             super(os);

@@ -1390,17 +1432,60 @@
             super.close();
             update(e);
         }
     }
 
+    // Output stream returned when writing "deflated" entries into memory,
+    // to enable eager (possibly parallel) deflation and reduce memory required.
+    private class DeflatingEntryOutputStream extends DeflaterOutputStream {
+        private final CRC32 crc;
+        private final Entry e;
+        private boolean isClosed;
+
+        DeflatingEntryOutputStream(Entry e, OutputStream os) throws IOException {
+            super(os, getDeflater());
+            this.e = Objects.requireNonNull(e, "Zip entry is null");
+            this.crc = new CRC32();
+        }
+
+        @Override
+        public synchronized void write(int b) throws IOException {
+            super.write(b);
+            crc.update(b);
+        }
+
+        @Override
+        public synchronized void write(byte b[], int off, int len)
+                throws IOException {
+            super.write(b, off, len);
+            crc.update(b, off, len);
+        }
+
+        @Override
+        public synchronized void close() throws IOException {
+            if (isClosed)
+                return;
+            isClosed = true;
+            finish();
+            e.size  = def.getBytesRead();
+            e.csize = def.getBytesWritten();
+            e.crc = crc.getValue();
+            if (out instanceof ByteArrayOutputStream)
+                e.bytes = ((ByteArrayOutputStream)out).toByteArray();
+            super.close();
+            update(e);
+            releaseDeflater(def);
+        }
+    }
+
     // Wrapper output stream class to write out a "stored" entry.
     // (1) this class does not close the underlying out stream when
     //     being closed.
     // (2) no need to be "synchronized", only used by sync()
     private class EntryOutputStreamCRC32 extends FilterOutputStream {
-        private Entry e;
-        private CRC32 crc;
+        private final CRC32 crc;
+        private final Entry e;
         private long written;
         private boolean isClosed;
 
         EntryOutputStreamCRC32(Entry e, OutputStream os) throws IOException {
             super(os);

@@ -1436,12 +1521,12 @@
     // Wrapper output stream class to write out a "deflated" entry.
     // (1) this class does not close the underlying out stream when
     //     being closed.
     // (2) no need to be "synchronized", only used by sync()
     private class EntryOutputStreamDef extends DeflaterOutputStream {
-        private CRC32 crc;
-        private Entry e;
+        private final CRC32 crc;
+        private final Entry e;
         private boolean isClosed;
 
         EntryOutputStreamDef(Entry e, OutputStream os) throws IOException {
             super(os, getDeflater());
             this.e =  Objects.requireNonNull(e, "Zip entry is null");

@@ -1469,18 +1554,16 @@
     }
 
     private InputStream getInputStream(Entry e)
         throws IOException
     {
-        InputStream eis = null;
-
+        InputStream eis;
         if (e.type == Entry.NEW) {
-            // now bytes & file is uncompressed.
             if (e.bytes != null)
-                return new ByteArrayInputStream(e.bytes);
+                eis = new ByteArrayInputStream(e.bytes);
             else if (e.file != null)
-                return Files.newInputStream(e.file);
+                eis = Files.newInputStream(e.file);
             else
                 throw new ZipException("update entry data is missing");
         } else if (e.type == Entry.FILECH) {
             // FILECH result is un-compressed.
             eis = Files.newInputStream(e.file);

@@ -1577,11 +1660,11 @@
             }
             if (len > rem) {
                 len = (int) rem;
             }
             // readFullyAt()
-            long n = 0;
+            long n;
             ByteBuffer bb = ByteBuffer.wrap(b);
             bb.position(off);
             bb.limit(off + len);
             synchronized(zfch) {
                 n = zfch.position(pos).read(bb);

@@ -1903,11 +1986,11 @@
         Entry(byte[] name, int type, boolean isdir, int method) {
             this(name, isdir, method);
             this.type = type;
         }
 
-        Entry (Entry e, int type) {
+        Entry(Entry e, int type) {
             name(e.name);
             this.isdir     = e.isdir;
             this.version   = e.version;
             this.ctime     = e.ctime;
             this.atime     = e.atime;

@@ -1926,11 +2009,11 @@
             this.locoff    = e.locoff;
             this.comment   = e.comment;
             this.type      = type;
         }
 
-        Entry (byte[] name, Path file, int type) {
+        Entry(byte[] name, Path file, int type) {
             this(name, type, false, METHOD_STORED);
             this.file = file;
         }
 
         int version() throws ZipException {

@@ -2422,10 +2505,24 @@
             fm.close();
             return sb.toString();
         }
     }
 
+    private static class ExChannelCloser  {
+        Path path;
+        SeekableByteChannel ch;
+        Set<InputStream> streams;
+        ExChannelCloser(Path path,
+                        SeekableByteChannel ch,
+                        Set<InputStream> streams)
+        {
+            this.path = path;
+            this.ch = ch;
+            this.streams = streams;
+        }
+    }
+
     // ZIP directory has two issues:
     // (1) ZIP spec does not require the ZIP file to include
     //     directory entry
     // (2) all entries are not stored/organized in a "tree"
     //     structure.
< prev index next >