< prev index next >

src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipFileSystem.java

Print this page
rev 54573 : 8222532: (zipfs) Performance regression when writing ZipFileSystem entries in parallel
Reviewed-by: TBD

*** 33,44 **** --- 33,46 ---- import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; + import java.nio.channels.Channels; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; + import java.nio.channels.NonWritableChannelException; import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.*; import java.nio.file.attribute.FileAttribute;
*** 597,611 **** } if (options.contains(APPEND) && options.contains(TRUNCATE_EXISTING)) throw new IllegalArgumentException("APPEND + TRUNCATE_EXISTING not allowed"); } - // Returns an output SeekableByteChannel for either // (1) writing the contents of a new entry, if the entry doesn't exit, or // (2) updating/replacing the contents of an existing entry. ! // Note: The content is not compressed. private class EntryOutputChannel extends ByteArrayChannel { Entry e; EntryOutputChannel(Entry e) throws IOException { super(e.size > 0? (int)e.size : 8192, false); --- 599,613 ---- } if (options.contains(APPEND) && options.contains(TRUNCATE_EXISTING)) throw new IllegalArgumentException("APPEND + TRUNCATE_EXISTING not allowed"); } // Returns an output SeekableByteChannel for either // (1) writing the contents of a new entry, if the entry doesn't exit, or // (2) updating/replacing the contents of an existing entry. ! // Note: The content of the channel is not compressed until the ! // channel is closed private class EntryOutputChannel extends ByteArrayChannel { Entry e; EntryOutputChannel(Entry e) throws IOException { super(e.size > 0? (int)e.size : 8192, false);
*** 620,642 **** e.flag |= FLAG_USE_UTF8; } @Override public void close() throws IOException { ! e.bytes = toByteArray(); ! e.size = e.bytes.length; ! e.crc = -1; super.close(); - update(e); } } private int getCompressMethod(FileAttribute<?>... attrs) { return defaultMethod; } ! // Returns a Writable/ReadByteChannel for now. Might consdier to use // newFileChannel() instead, which dump the entry data into a regular // file on the default file system and create a FileChannel on top of // it. SeekableByteChannel newByteChannel(byte[] path, Set<? extends OpenOption> options, --- 622,643 ---- e.flag |= FLAG_USE_UTF8; } @Override public void close() throws IOException { ! OutputStream os = getOutputStream(e); ! os.write(toByteArray()); ! os.close(); // will update the entry super.close(); } } private int getCompressMethod(FileAttribute<?>... attrs) { return defaultMethod; } ! // Returns a Writable/ReadByteChannel for now. Might consider to use // newFileChannel() instead, which dump the entry data into a regular // file on the default file system and create a FileChannel on top of // it. SeekableByteChannel newByteChannel(byte[] path, Set<? extends OpenOption> options,
*** 645,658 **** { checkOptions(options); if (options.contains(StandardOpenOption.WRITE) || options.contains(StandardOpenOption.APPEND)) { checkWritable(); ! beginRead(); // only need a readlock, the "update()" will obtain ! // thewritelock when the channel is closed try { - ensureOpen(); Entry e = getEntry(path); if (e != null) { if (e.isDir() || options.contains(CREATE_NEW)) throw new FileAlreadyExistsException(getString(path)); SeekableByteChannel sbc = --- 646,658 ---- { checkOptions(options); if (options.contains(StandardOpenOption.WRITE) || options.contains(StandardOpenOption.APPEND)) { checkWritable(); ! beginRead(); // only need a read lock, the "update()" will obtain ! // the write lock when the channel is closed try { Entry e = getEntry(path); if (e != null) { if (e.isDir() || options.contains(CREATE_NEW)) throw new FileAlreadyExistsException(getString(path)); SeekableByteChannel sbc =
*** 674,684 **** if (!options.contains(CREATE) && !options.contains(CREATE_NEW)) throw new NoSuchFileException(getString(path)); checkParents(path); return new EntryOutputChannel( new Entry(path, Entry.NEW, false, getCompressMethod(attrs))); - } finally { endRead(); } } else { beginRead(); --- 674,683 ----
*** 741,751 **** .provider() .newFileChannel(tmpfile, options, attrs); final Entry u = isFCH ? e : new Entry(path, tmpfile, Entry.FILECH); if (forWrite) { u.flag = FLAG_DATADESCR; ! u.method = getCompressMethod(attrs); } // is there a better way to hook into the FileChannel's close method? return new FileChannel() { public int write(ByteBuffer src) throws IOException { return fch.write(src); --- 740,750 ---- .provider() .newFileChannel(tmpfile, options, attrs); final Entry u = isFCH ? e : new Entry(path, tmpfile, Entry.FILECH); if (forWrite) { u.flag = FLAG_DATADESCR; ! u.method = getCompressMethod(); } // is there a better way to hook into the FileChannel's close method? return new FileChannel() { public int write(ByteBuffer src) throws IOException { return fch.write(src);
*** 842,852 **** } } // the outstanding input streams that need to be closed private Set<InputStream> streams = ! Collections.synchronizedSet(new HashSet<InputStream>()); private Set<Path> tmppaths = Collections.synchronizedSet(new HashSet<Path>()); private Path getTempPathForEntry(byte[] path) throws IOException { Path tmpPath = createTempFileInSameDirectoryAs(zfpath); if (path != null) { --- 841,855 ---- } } // the outstanding input streams that need to be closed private Set<InputStream> streams = ! Collections.synchronizedSet(new HashSet<>()); ! ! // the ex-channel and ex-path that need to close when their outstanding ! // input streams are all closed by the obtainers. ! private Set<ExChannelCloser> exChClosers = new HashSet<>(); private Set<Path> tmppaths = Collections.synchronizedSet(new HashSet<Path>()); private Path getTempPathForEntry(byte[] path) throws IOException { Path tmpPath = createTempFileInSameDirectoryAs(zfpath); if (path != null) {
*** 1200,1249 **** locoff += n; } return written; } ! private long writeEntry(Entry e, OutputStream os, byte[] buf) throws IOException { if (e.bytes == null && e.file == null) // dir, 0-length data return 0; long written = 0; ! try (OutputStream os2 = e.method == METHOD_STORED ? new EntryOutputStreamCRC32(e, os) : new EntryOutputStreamDef(e, os)) { ! if (e.bytes != null) { // in-memory ! os2.write(e.bytes, 0, e.bytes.length); ! } else if (e.file != null) { // tmp file ! if (e.type == Entry.NEW || e.type == Entry.FILECH) { ! try (InputStream is = Files.newInputStream(e.file)) { ! is.transferTo(os2); ! } ! } ! Files.delete(e.file); ! tmppaths.remove(e.file); } } written += e.csize; if ((e.flag & FLAG_DATADESCR) != 0) { written += e.writeEXT(os); } return written; } // sync the zip file system, if there is any udpate private void sync() throws IOException { ! if (!hasUpdate) return; Path tmpFile = createTempFileInSameDirectoryAs(zfpath); ! try (OutputStream os = new BufferedOutputStream(Files.newOutputStream(tmpFile, WRITE))) ! { ArrayList<Entry> elist = new ArrayList<>(inodes.size()); long written = 0; ! byte[] buf = new byte[8192]; ! Entry e = null; // write loc for (IndexNode inode : inodes.values()) { if (inode instanceof Entry) { // an updated inode e = (Entry)inode; --- 1203,1269 ---- locoff += n; } return written; } ! private long writeEntry(Entry e, OutputStream os) throws IOException { if (e.bytes == null && e.file == null) // dir, 0-length data return 0; long written = 0; ! if (e.crc != 0 && e.csize > 0) { ! // pre-compressed entry, write directly to output stream ! writeTo(e, os); ! } else { ! try (OutputStream os2 = (e.method == METHOD_STORED) ? new EntryOutputStreamCRC32(e, os) : new EntryOutputStreamDef(e, os)) { ! writeTo(e, os2); } } written += e.csize; if ((e.flag & FLAG_DATADESCR) != 0) { written += e.writeEXT(os); } return written; } + private void writeTo(Entry e, OutputStream os) throws IOException { + if (e.bytes != null) { + os.write(e.bytes, 0, e.bytes.length); + } else if (e.file != null) { + if (e.type == Entry.NEW || e.type == Entry.FILECH) { + try (InputStream is = Files.newInputStream(e.file)) { + is.transferTo(os); + } + } + Files.delete(e.file); + tmppaths.remove(e.file); + } + } + // sync the zip file system, if there is any udpate private void sync() throws IOException { ! // check ex-closer ! if (!exChClosers.isEmpty()) { ! for (ExChannelCloser ecc : exChClosers) { ! if (ecc.streams.isEmpty()) { ! ecc.ch.close(); ! Files.delete(ecc.path); ! exChClosers.remove(ecc); ! } ! } ! } if (!hasUpdate) return; Path tmpFile = createTempFileInSameDirectoryAs(zfpath); ! try (OutputStream os = new BufferedOutputStream(Files.newOutputStream(tmpFile, WRITE))) { ArrayList<Entry> elist = new ArrayList<>(inodes.size()); long written = 0; ! byte[] buf = null; ! Entry e; // write loc for (IndexNode inode : inodes.values()) { if (inode instanceof Entry) { // an updated inode e = (Entry)inode;
*** 1252,1266 **** // entry copy: the only thing changed is the "name" // and "nlen" in LOC header, so we udpate/rewrite the // LOC in new file and simply copy the rest (data and // ext) without enflating/deflating from the old zip // file LOC entry. written += copyLOCEntry(e, true, os, written, buf); } else { // NEW, FILECH or CEN e.locoff = written; written += e.writeLOC(os); // write loc header ! written += writeEntry(e, os, buf); } elist.add(e); } catch (IOException x) { x.printStackTrace(); // skip any in-accurate entry } --- 1272,1288 ---- // entry copy: the only thing changed is the "name" // and "nlen" in LOC header, so we udpate/rewrite the // LOC in new file and simply copy the rest (data and // ext) without enflating/deflating from the old zip // file LOC entry. + if (buf == null) + buf = new byte[8192]; written += copyLOCEntry(e, true, os, written, buf); } else { // NEW, FILECH or CEN e.locoff = written; written += e.writeLOC(os); // write loc header ! written += writeEntry(e, os); } elist.add(e); } catch (IOException x) { x.printStackTrace(); // skip any in-accurate entry }
*** 1272,1281 **** --- 1294,1305 ---- continue; // no root '/' directory even it // exits in original zip/jar file. } e = Entry.readCEN(this, inode); try { + if (buf == null) + buf = new byte[8192]; written += copyLOCEntry(e, false, os, written, buf); elist.add(e); } catch (IOException x) { x.printStackTrace(); // skip any wrong entry }
*** 1289,1301 **** } end.centot = elist.size(); end.cenlen = written - end.cenoff; end.write(os, written, forceEnd64); } ! ch.close(); Files.delete(zfpath); Files.move(tmpFile, zfpath, REPLACE_EXISTING); hasUpdate = false; // clear } IndexNode getInode(byte[] path) { --- 1313,1339 ---- } end.centot = elist.size(); end.cenlen = written - end.cenoff; end.write(os, written, forceEnd64); } ! if (!streams.isEmpty()) { ! // ! // There are outstanding input streams open on existing "ch", ! // so, don't close the "cha" and delete the "file for now, let ! // the "ex-channel-closer" to handle them ! ExChannelCloser ecc = new ExChannelCloser( ! createTempFileInSameDirectoryAs(zfpath), ! ch, ! streams); ! Files.move(zfpath, ecc.path, REPLACE_EXISTING); ! exChClosers.add(ecc); ! streams = Collections.synchronizedSet(new HashSet<InputStream>()); ! } else { ch.close(); Files.delete(zfpath); + } + Files.move(tmpFile, zfpath, REPLACE_EXISTING); hasUpdate = false; // clear } IndexNode getInode(byte[] path) {
*** 1349,1363 **** e.file = getTempPathForEntry(null); os = Files.newOutputStream(e.file, WRITE); } else { os = new ByteArrayOutputStream((e.size > 0)? (int)e.size : 8192); } return new EntryOutputStream(e, os); } private class EntryOutputStream extends FilterOutputStream { ! private Entry e; private long written; private boolean isClosed; EntryOutputStream(Entry e, OutputStream os) throws IOException { super(os); --- 1387,1405 ---- e.file = getTempPathForEntry(null); os = Files.newOutputStream(e.file, WRITE); } else { os = new ByteArrayOutputStream((e.size > 0)? (int)e.size : 8192); } + if (e.method == METHOD_DEFLATED) { + return new DeflatingEntryOutputStream(e, os); + } else { return new EntryOutputStream(e, os); } + } private class EntryOutputStream extends FilterOutputStream { ! private final Entry e; private long written; private boolean isClosed; EntryOutputStream(Entry e, OutputStream os) throws IOException { super(os);
*** 1390,1406 **** super.close(); update(e); } } // Wrapper output stream class to write out a "stored" entry. // (1) this class does not close the underlying out stream when // being closed. // (2) no need to be "synchronized", only used by sync() private class EntryOutputStreamCRC32 extends FilterOutputStream { ! private Entry e; ! private CRC32 crc; private long written; private boolean isClosed; EntryOutputStreamCRC32(Entry e, OutputStream os) throws IOException { super(os); --- 1432,1491 ---- super.close(); update(e); } } + // Output stream returned when writing "deflated" entries into memory, + // to enable eager (possibly parallel) deflation and reduce memory required. + private class DeflatingEntryOutputStream extends DeflaterOutputStream { + private final CRC32 crc; + private final Entry e; + private boolean isClosed; + + DeflatingEntryOutputStream(Entry e, OutputStream os) throws IOException { + super(os, getDeflater()); + this.e = Objects.requireNonNull(e, "Zip entry is null"); + this.crc = new CRC32(); + } + + @Override + public synchronized void write(int b) throws IOException { + super.write(b); + crc.update(b); + } + + @Override + public synchronized void write(byte b[], int off, int len) + throws IOException { + super.write(b, off, len); + crc.update(b, off, len); + } + + @Override + public synchronized void close() throws IOException { + if (isClosed) + return; + isClosed = true; + finish(); + e.size = def.getBytesRead(); + e.csize = def.getBytesWritten(); + e.crc = crc.getValue(); + if (out instanceof ByteArrayOutputStream) + e.bytes = ((ByteArrayOutputStream)out).toByteArray(); + super.close(); + update(e); + releaseDeflater(def); + } + } + // Wrapper output stream class to write out a "stored" entry. // (1) this class does not close the underlying out stream when // being closed. // (2) no need to be "synchronized", only used by sync() private class EntryOutputStreamCRC32 extends FilterOutputStream { ! private final CRC32 crc; ! private final Entry e; private long written; private boolean isClosed; EntryOutputStreamCRC32(Entry e, OutputStream os) throws IOException { super(os);
*** 1436,1447 **** // Wrapper output stream class to write out a "deflated" entry. // (1) this class does not close the underlying out stream when // being closed. // (2) no need to be "synchronized", only used by sync() private class EntryOutputStreamDef extends DeflaterOutputStream { ! private CRC32 crc; ! private Entry e; private boolean isClosed; EntryOutputStreamDef(Entry e, OutputStream os) throws IOException { super(os, getDeflater()); this.e = Objects.requireNonNull(e, "Zip entry is null"); --- 1521,1532 ---- // Wrapper output stream class to write out a "deflated" entry. // (1) this class does not close the underlying out stream when // being closed. // (2) no need to be "synchronized", only used by sync() private class EntryOutputStreamDef extends DeflaterOutputStream { ! private final CRC32 crc; ! private final Entry e; private boolean isClosed; EntryOutputStreamDef(Entry e, OutputStream os) throws IOException { super(os, getDeflater()); this.e = Objects.requireNonNull(e, "Zip entry is null");
*** 1469,1486 **** } private InputStream getInputStream(Entry e) throws IOException { ! InputStream eis = null; ! if (e.type == Entry.NEW) { - // now bytes & file is uncompressed. if (e.bytes != null) ! return new ByteArrayInputStream(e.bytes); else if (e.file != null) ! return Files.newInputStream(e.file); else throw new ZipException("update entry data is missing"); } else if (e.type == Entry.FILECH) { // FILECH result is un-compressed. eis = Files.newInputStream(e.file); --- 1554,1569 ---- } private InputStream getInputStream(Entry e) throws IOException { ! InputStream eis; if (e.type == Entry.NEW) { if (e.bytes != null) ! eis = new ByteArrayInputStream(e.bytes); else if (e.file != null) ! eis = Files.newInputStream(e.file); else throw new ZipException("update entry data is missing"); } else if (e.type == Entry.FILECH) { // FILECH result is un-compressed. eis = Files.newInputStream(e.file);
*** 1577,1587 **** } if (len > rem) { len = (int) rem; } // readFullyAt() ! long n = 0; ByteBuffer bb = ByteBuffer.wrap(b); bb.position(off); bb.limit(off + len); synchronized(zfch) { n = zfch.position(pos).read(bb); --- 1660,1670 ---- } if (len > rem) { len = (int) rem; } // readFullyAt() ! long n; ByteBuffer bb = ByteBuffer.wrap(b); bb.position(off); bb.limit(off + len); synchronized(zfch) { n = zfch.position(pos).read(bb);
*** 1903,1913 **** Entry(byte[] name, int type, boolean isdir, int method) { this(name, isdir, method); this.type = type; } ! Entry (Entry e, int type) { name(e.name); this.isdir = e.isdir; this.version = e.version; this.ctime = e.ctime; this.atime = e.atime; --- 1986,1996 ---- Entry(byte[] name, int type, boolean isdir, int method) { this(name, isdir, method); this.type = type; } ! Entry(Entry e, int type) { name(e.name); this.isdir = e.isdir; this.version = e.version; this.ctime = e.ctime; this.atime = e.atime;
*** 1926,1936 **** this.locoff = e.locoff; this.comment = e.comment; this.type = type; } ! Entry (byte[] name, Path file, int type) { this(name, type, false, METHOD_STORED); this.file = file; } int version() throws ZipException { --- 2009,2019 ---- this.locoff = e.locoff; this.comment = e.comment; this.type = type; } ! Entry(byte[] name, Path file, int type) { this(name, type, false, METHOD_STORED); this.file = file; } int version() throws ZipException {
*** 2422,2431 **** --- 2505,2528 ---- fm.close(); return sb.toString(); } } + private static class ExChannelCloser { + Path path; + SeekableByteChannel ch; + Set<InputStream> streams; + ExChannelCloser(Path path, + SeekableByteChannel ch, + Set<InputStream> streams) + { + this.path = path; + this.ch = ch; + this.streams = streams; + } + } + // ZIP directory has two issues: // (1) ZIP spec does not require the ZIP file to include // directory entry // (2) all entries are not stored/organized in a "tree" // structure.
< prev index next >