Skip to content

Commit

Permalink
up
Browse files Browse the repository at this point in the history
  • Loading branch information
youfanx committed Aug 28, 2023
1 parent 5ecefb1 commit 44db463
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 43 deletions.
77 changes: 43 additions & 34 deletions rxlib/src/main/java/org/rx/io/HybridStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,76 +13,85 @@ public final class HybridStream extends IOStream implements Serializable {
private static final long serialVersionUID = 2137331266386948293L;
private final int maxMemorySize;
private final String tempFilePath;
private MemoryStream memoryStream;
private FileStream fileStream;
private IOStream stream;
@Setter
private String name;

private synchronized IOStream getStream() {
if (fileStream != null) {
return fileStream;
}
if (memoryStream.getLength() > maxMemorySize) {
log.info("Arrival MaxMemorySize[{}] threshold, switch FileStream", maxMemorySize);
fileStream = tempFilePath == null ? new FileStream() : new FileStream(tempFilePath);
fileStream.write(memoryStream.rewind());
memoryStream.close();
memoryStream = null;
return fileStream;
}
return memoryStream;
}

@Override
public String getName() {
if (name == null) {
return getStream().getName();
return stream.getName();
}
return name;
}

@Override
public InputStream getReader() {
return getStream().getReader();
public synchronized InputStream getReader() {
return stream.getReader();
}

@Override
public OutputStream getWriter() {
return getStream().getWriter();
public synchronized OutputStream getWriter() {
checkCapacity();
return stream.getWriter();
}

@Override
public boolean canSeek() {
return getStream().canSeek();
public synchronized boolean canSeek() {
return stream.canSeek();
}

@Override
public long getPosition() {
return getStream().getPosition();
public synchronized long getPosition() {
return stream.getPosition();
}

@Override
public void setPosition(long position) {
getStream().setPosition(position);
public synchronized void setPosition(long position) {
checkCapacity();
stream.setPosition(position);
}

@Override
public long getLength() {
return getStream().getLength();
public synchronized long getLength() {
return stream.getLength();
}

public HybridStream() {
this(Constants.MAX_HEAP_BUF_SIZE, false, null);
this(Constants.MAX_HEAP_BUF_SIZE, false);
}

public HybridStream(int maxMemorySize, boolean directMemory) {
this(maxMemorySize, directMemory, null);
}

public HybridStream(int maxMemorySize, boolean directMemory, String tempFilePath) {
this.maxMemorySize = maxMemorySize;
if ((this.maxMemorySize = maxMemorySize) > Constants.MAX_HEAP_BUF_SIZE) {
log.warn("maxMemorySize gt {}", Constants.MAX_HEAP_BUF_SIZE);
}
this.tempFilePath = tempFilePath;
memoryStream = new MemoryStream(Constants.HEAP_BUF_SIZE, directMemory);
stream = maxMemorySize <= 0 ? newFileStream() : new MemoryStream(maxMemorySize, directMemory);
}

@Override
protected void freeObjects() {
getStream().close();
stream.close();
}

FileStream newFileStream() {
return tempFilePath == null ? new FileStream() : new FileStream(tempFilePath);
}

synchronized void checkCapacity() {
if (stream instanceof FileStream) {
return;
}
if (stream.getLength() > maxMemorySize) {
log.info("Arrival MaxMemorySize[{}] threshold, switch FileStream", maxMemorySize);
FileStream fs = newFileStream();
fs.write(stream.rewind());
stream.close();
stream = fs;
}
}
}
2 changes: 1 addition & 1 deletion rxlib/src/main/java/org/rx/io/Serializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ default <T> IOStream serialize(T obj) {
}

default <T> HybridStream serialize(T obj, int maxMemorySize, boolean directMemory) {
HybridStream stream = new HybridStream(maxMemorySize, directMemory, null);
HybridStream stream = new HybridStream(maxMemorySize, directMemory);
serialize(obj, stream);
return stream.rewind();
}
Expand Down
9 changes: 4 additions & 5 deletions rxlib/src/main/java/org/rx/net/http/HttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
import org.apache.commons.collections4.MapUtils;
import org.rx.bean.ProceedEventArgs;
import org.rx.bean.Tuple;
import org.rx.core.Linq;
import org.rx.core.Reflects;
import org.rx.core.RxConfig;
import org.rx.core.Strings;
import org.rx.core.*;
import org.rx.exception.InvalidException;
import org.rx.io.Files;
import org.rx.io.HybridStream;
Expand All @@ -33,6 +30,7 @@
import java.io.File;
import java.io.InputStream;
import java.io.Serializable;
import java.lang.StringBuilder;
import java.net.Proxy;
import java.net.URLDecoder;
import java.net.URLEncoder;
Expand Down Expand Up @@ -199,7 +197,8 @@ public synchronized <T> T handle(BiFunc<InputStream, T> fn) {
throw new InvalidException("Empty response from url {}", getResponseUrl());
}
try {
stream = new HybridStream();
Long len = Reflects.changeType(response.header(HttpHeaderNames.CONTENT_LENGTH.toString()), Long.class);
stream = new HybridStream(len != null && len > Constants.MAX_HEAP_BUF_SIZE ? 0 : Constants.MAX_HEAP_BUF_SIZE, false);
stream.write(body.byteStream());
} finally {
body.close();
Expand Down
7 changes: 4 additions & 3 deletions rxlib/src/test/java/org/rx/io/TestIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -481,10 +481,11 @@ public void zipStream() {

@Test
public void hybridStream() {
int[] maxSizes = new int[]{35, 70};
for (int max : maxSizes) {
HybridStream stream = new HybridStream(max, true, null);
int[] maxMemSizes = new int[]{20, 70};
for (int memSize : maxMemSizes) {
HybridStream stream = new HybridStream(memSize, true, null);
testSeekStream(stream);
// stream.write(new byte[Constants.MAX_HEAP_BUF_SIZE]);

long position = stream.getPosition();
System.out.println(position);
Expand Down

0 comments on commit 44db463

Please sign in to comment.