# HG changeset patch # User redestad # Date 1555592374 -7200 # Thu Apr 18 14:59:34 2019 +0200 # Node ID 49f785ec97401bbd130040f4e0996cc6407c6145 # Parent 75a42622414ed747c7e68b281735baf394a70ebf 8222532: (zipfs) Performance regression when writing ZipFileSystem entries in parallel Reviewed-by: TBD diff --git a/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipFileSystem.java b/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipFileSystem.java --- a/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipFileSystem.java +++ b/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipFileSystem.java @@ -35,8 +35,10 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; +import java.nio.channels.Channels; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; +import java.nio.channels.NonWritableChannelException; import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; import java.nio.channels.WritableByteChannel; @@ -599,11 +601,11 @@ throw new IllegalArgumentException("APPEND + TRUNCATE_EXISTING not allowed"); } - // Returns an output SeekableByteChannel for either // (1) writing the contents of a new entry, if the entry doesn't exit, or // (2) updating/replacing the contents of an existing entry. - // Note: The content is not compressed. + // Note: The content of the channel is not compressed until the + // channel is closed private class EntryOutputChannel extends ByteArrayChannel { Entry e; @@ -622,11 +624,10 @@ @Override public void close() throws IOException { - e.bytes = toByteArray(); - e.size = e.bytes.length; - e.crc = -1; + OutputStream os = getOutputStream(e); + os.write(toByteArray()); + os.close(); // will update the entry super.close(); - update(e); } } @@ -634,7 +635,7 @@ return defaultMethod; } - // Returns a Writable/ReadByteChannel for now. Might consdier to use + // Returns a Writable/ReadByteChannel for now. Might consider to use // newFileChannel() instead, which dump the entry data into a regular // file on the default file system and create a FileChannel on top of // it. @@ -647,10 +648,9 @@ if (options.contains(StandardOpenOption.WRITE) || options.contains(StandardOpenOption.APPEND)) { checkWritable(); - beginRead(); // only need a readlock, the "update()" will obtain - // thewritelock when the channel is closed + beginRead(); // only need a read lock, the "update()" will obtain + // the write lock when the channel is closed try { - ensureOpen(); Entry e = getEntry(path); if (e != null) { if (e.isDir() || options.contains(CREATE_NEW)) @@ -676,7 +676,6 @@ checkParents(path); return new EntryOutputChannel( new Entry(path, Entry.NEW, false, getCompressMethod(attrs))); - } finally { endRead(); } @@ -743,7 +742,7 @@ final Entry u = isFCH ? e : new Entry(path, tmpfile, Entry.FILECH); if (forWrite) { u.flag = FLAG_DATADESCR; - u.method = getCompressMethod(attrs); + u.method = getCompressMethod(); } // is there a better way to hook into the FileChannel's close method? return new FileChannel() { @@ -844,7 +843,11 @@ // the outstanding input streams that need to be closed private Set streams = - Collections.synchronizedSet(new HashSet()); + Collections.synchronizedSet(new HashSet<>()); + + // the ex-channel and ex-path that need to close when their outstanding + // input streams are all closed by the obtainers. + private Set exChClosers = new HashSet<>(); private Set tmppaths = Collections.synchronizedSet(new HashSet()); private Path getTempPathForEntry(byte[] path) throws IOException { @@ -1202,25 +1205,20 @@ return written; } - private long writeEntry(Entry e, OutputStream os, byte[] buf) + private long writeEntry(Entry e, OutputStream os) throws IOException { if (e.bytes == null && e.file == null) // dir, 0-length data return 0; long written = 0; - try (OutputStream os2 = e.method == METHOD_STORED ? - new EntryOutputStreamCRC32(e, os) : new EntryOutputStreamDef(e, os)) { - if (e.bytes != null) { // in-memory - os2.write(e.bytes, 0, e.bytes.length); - } else if (e.file != null) { // tmp file - if (e.type == Entry.NEW || e.type == Entry.FILECH) { - try (InputStream is = Files.newInputStream(e.file)) { - is.transferTo(os2); - } - } - Files.delete(e.file); - tmppaths.remove(e.file); + if (e.crc != 0 && e.csize > 0) { + // pre-compressed entry, write directly to output stream + writeTo(e, os); + } else { + try (OutputStream os2 = (e.method == METHOD_STORED) ? + new EntryOutputStreamCRC32(e, os) : new EntryOutputStreamDef(e, os)) { + writeTo(e, os2); } } written += e.csize; @@ -1230,18 +1228,40 @@ return written; } + private void writeTo(Entry e, OutputStream os) throws IOException { + if (e.bytes != null) { + os.write(e.bytes, 0, e.bytes.length); + } else if (e.file != null) { + if (e.type == Entry.NEW || e.type == Entry.FILECH) { + try (InputStream is = Files.newInputStream(e.file)) { + is.transferTo(os); + } + } + Files.delete(e.file); + tmppaths.remove(e.file); + } + } + // sync the zip file system, if there is any udpate private void sync() throws IOException { - + // check ex-closer + if (!exChClosers.isEmpty()) { + for (ExChannelCloser ecc : exChClosers) { + if (ecc.streams.isEmpty()) { + ecc.ch.close(); + Files.delete(ecc.path); + exChClosers.remove(ecc); + } + } + } if (!hasUpdate) return; Path tmpFile = createTempFileInSameDirectoryAs(zfpath); - try (OutputStream os = new BufferedOutputStream(Files.newOutputStream(tmpFile, WRITE))) - { + try (OutputStream os = new BufferedOutputStream(Files.newOutputStream(tmpFile, WRITE))) { ArrayList elist = new ArrayList<>(inodes.size()); long written = 0; - byte[] buf = new byte[8192]; - Entry e = null; + byte[] buf = null; + Entry e; // write loc for (IndexNode inode : inodes.values()) { @@ -1254,11 +1274,13 @@ // LOC in new file and simply copy the rest (data and // ext) without enflating/deflating from the old zip // file LOC entry. + if (buf == null) + buf = new byte[8192]; written += copyLOCEntry(e, true, os, written, buf); } else { // NEW, FILECH or CEN e.locoff = written; written += e.writeLOC(os); // write loc header - written += writeEntry(e, os, buf); + written += writeEntry(e, os); } elist.add(e); } catch (IOException x) { @@ -1274,6 +1296,8 @@ } e = Entry.readCEN(this, inode); try { + if (buf == null) + buf = new byte[8192]; written += copyLOCEntry(e, false, os, written, buf); elist.add(e); } catch (IOException x) { @@ -1291,9 +1315,23 @@ end.cenlen = written - end.cenoff; end.write(os, written, forceEnd64); } + if (!streams.isEmpty()) { + // + // There are outstanding input streams open on existing "ch", + // so, don't close the "cha" and delete the "file for now, let + // the "ex-channel-closer" to handle them + ExChannelCloser ecc = new ExChannelCloser( + createTempFileInSameDirectoryAs(zfpath), + ch, + streams); + Files.move(zfpath, ecc.path, REPLACE_EXISTING); + exChClosers.add(ecc); + streams = Collections.synchronizedSet(new HashSet()); + } else { + ch.close(); + Files.delete(zfpath); + } - ch.close(); - Files.delete(zfpath); Files.move(tmpFile, zfpath, REPLACE_EXISTING); hasUpdate = false; // clear } @@ -1351,11 +1389,15 @@ } else { os = new ByteArrayOutputStream((e.size > 0)? (int)e.size : 8192); } - return new EntryOutputStream(e, os); + if (e.method == METHOD_DEFLATED) { + return new DeflatingEntryOutputStream(e, os); + } else { + return new EntryOutputStream(e, os); + } } private class EntryOutputStream extends FilterOutputStream { - private Entry e; + private final Entry e; private long written; private boolean isClosed; @@ -1392,13 +1434,56 @@ } } + // Output stream returned when writing "deflated" entries into memory, + // to enable eager (possibly parallel) deflation and reduce memory required. + private class DeflatingEntryOutputStream extends DeflaterOutputStream { + private final CRC32 crc; + private final Entry e; + private boolean isClosed; + + DeflatingEntryOutputStream(Entry e, OutputStream os) throws IOException { + super(os, getDeflater()); + this.e = Objects.requireNonNull(e, "Zip entry is null"); + this.crc = new CRC32(); + } + + @Override + public synchronized void write(int b) throws IOException { + super.write(b); + crc.update(b); + } + + @Override + public synchronized void write(byte b[], int off, int len) + throws IOException { + super.write(b, off, len); + crc.update(b, off, len); + } + + @Override + public synchronized void close() throws IOException { + if (isClosed) + return; + isClosed = true; + finish(); + e.size = def.getBytesRead(); + e.csize = def.getBytesWritten(); + e.crc = crc.getValue(); + if (out instanceof ByteArrayOutputStream) + e.bytes = ((ByteArrayOutputStream)out).toByteArray(); + super.close(); + update(e); + releaseDeflater(def); + } + } + // Wrapper output stream class to write out a "stored" entry. // (1) this class does not close the underlying out stream when // being closed. // (2) no need to be "synchronized", only used by sync() private class EntryOutputStreamCRC32 extends FilterOutputStream { - private Entry e; - private CRC32 crc; + private final CRC32 crc; + private final Entry e; private long written; private boolean isClosed; @@ -1438,8 +1523,8 @@ // being closed. // (2) no need to be "synchronized", only used by sync() private class EntryOutputStreamDef extends DeflaterOutputStream { - private CRC32 crc; - private Entry e; + private final CRC32 crc; + private final Entry e; private boolean isClosed; EntryOutputStreamDef(Entry e, OutputStream os) throws IOException { @@ -1471,14 +1556,12 @@ private InputStream getInputStream(Entry e) throws IOException { - InputStream eis = null; - + InputStream eis; if (e.type == Entry.NEW) { - // now bytes & file is uncompressed. if (e.bytes != null) - return new ByteArrayInputStream(e.bytes); + eis = new ByteArrayInputStream(e.bytes); else if (e.file != null) - return Files.newInputStream(e.file); + eis = Files.newInputStream(e.file); else throw new ZipException("update entry data is missing"); } else if (e.type == Entry.FILECH) { @@ -1579,7 +1662,7 @@ len = (int) rem; } // readFullyAt() - long n = 0; + long n; ByteBuffer bb = ByteBuffer.wrap(b); bb.position(off); bb.limit(off + len); @@ -1905,7 +1988,7 @@ this.type = type; } - Entry (Entry e, int type) { + Entry(Entry e, int type) { name(e.name); this.isdir = e.isdir; this.version = e.version; @@ -1928,7 +2011,7 @@ this.type = type; } - Entry (byte[] name, Path file, int type) { + Entry(byte[] name, Path file, int type) { this(name, type, false, METHOD_STORED); this.file = file; } @@ -2424,6 +2507,20 @@ } } + private static class ExChannelCloser { + Path path; + SeekableByteChannel ch; + Set streams; + ExChannelCloser(Path path, + SeekableByteChannel ch, + Set streams) + { + this.path = path; + this.ch = ch; + this.streams = streams; + } + } + // ZIP directory has two issues: // (1) ZIP spec does not require the ZIP file to include // directory entry diff --git a/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipFileSystemProvider.java b/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipFileSystemProvider.java --- a/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipFileSystemProvider.java +++ b/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipFileSystemProvider.java @@ -104,7 +104,7 @@ if (filesystems.containsKey(realPath)) throw new FileSystemAlreadyExistsException(); } - ZipFileSystem zipfs = null; + ZipFileSystem zipfs; try { if (env.containsKey("multi-release")) { zipfs = new JarFileSystem(this, path, env); @@ -131,13 +131,13 @@ throws IOException { ensureFile(path); - try { - ZipFileSystem zipfs; - if (env.containsKey("multi-release")) { - zipfs = new JarFileSystem(this, path, env); - } else { - zipfs = new ZipFileSystem(this, path, env); - } + try { + ZipFileSystem zipfs; + if (env.containsKey("multi-release")) { + zipfs = new JarFileSystem(this, path, env); + } else { + zipfs = new ZipFileSystem(this, path, env); + } return zipfs; } catch (ZipException ze) { String pname = path.toString(); diff --git a/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipPath.java b/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipPath.java --- a/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipPath.java +++ b/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipPath.java @@ -676,7 +676,7 @@ @Override public Iterator iterator() { - return new Iterator() { + return new Iterator<>() { private int i = 0; @Override @@ -746,8 +746,8 @@ void setAttribute(String attribute, Object value, LinkOption... options) throws IOException { - String type = null; - String attr = null; + String type; + String attr; int colonPos = attribute.indexOf(':'); if (colonPos == -1) { type = "basic"; @@ -772,8 +772,8 @@ throws IOException { - String view = null; - String attrs = null; + String view; + String attrs; int colonPos = attributes.indexOf(':'); if (colonPos == -1) { view = "basic";