Skip to content

Commit

Permalink
rmt
Browse files Browse the repository at this point in the history
  • Loading branch information
chenby committed Aug 19, 2018
1 parent f62d9b7 commit 1b46529
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,22 @@
import com.moilioncircle.redis.rdb.cli.ext.CliRedisReplicator;
import com.moilioncircle.redis.rdb.cli.ext.rmt.ClusterRdbVisitor;
import com.moilioncircle.redis.rdb.cli.ext.rmt.SingleRdbVisitor;
import com.moilioncircle.redis.rdb.cli.glossary.DataType;
import com.moilioncircle.redis.rdb.cli.net.Endpoint;
import com.moilioncircle.redis.rdb.cli.util.ProgressBar;
import com.moilioncircle.redis.replicator.FileType;
import com.moilioncircle.redis.replicator.RedisURI;
import com.moilioncircle.redis.replicator.Replicator;
import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent;
import com.moilioncircle.redis.replicator.event.PreCommandSyncEvent;
import com.moilioncircle.redis.replicator.event.PreRdbSyncEvent;
import com.moilioncircle.redis.replicator.rdb.RdbVisitor;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;

import java.io.File;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.List;

import static com.moilioncircle.redis.rdb.cli.glossary.DataType.parse;
Expand Down Expand Up @@ -112,7 +116,7 @@ protected void doExecute(CommandLine line) throws Exception {
}
try (ProgressBar bar = new ProgressBar(-1)) {
Replicator r = new CliRedisReplicator(source, configure);
r.setRdbVisitor(new SingleRdbVisitor(r, configure, migrate, db, regexs, parse(type), replace));
r.setRdbVisitor(getRdbVisitor(r, configure, uri, db, regexs, parse(type), replace));
Runtime.getRuntime().addShutdownHook(new Thread(() -> CliRedisReplicator.closeQuietly(r)));
r.addExceptionListener((rep, tx, e) -> {
throw new RuntimeException(tx.getMessage(), tx);
Expand Down Expand Up @@ -149,6 +153,19 @@ protected void doExecute(CommandLine line) throws Exception {
}
}
}

private RdbVisitor getRdbVisitor(Replicator replicator, Configure configure, RedisURI uri, List<Long> db, List<String> regexs, List<DataType> types, boolean replace) throws Exception {
try (Endpoint endpoint = new Endpoint(uri.getHost(), uri.getPort())) {
Endpoint.RedisObject r = endpoint.send("cluster".getBytes(), "nodes".getBytes());
if (r.type.isError()) {
return new SingleRdbVisitor(replicator, configure, uri, db, regexs, types, replace);
} else {
String config = r.getString();
List<String> lines = Arrays.asList(config.split("\n"));
return new ClusterRdbVisitor(replicator, configure, lines, regexs, types, replace);
}
}
}

@Override
public String name() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ public class SingleRdbVisitor extends AbstractMigrateRdbVisitor implements Event
private final RedisURI uri;
private final Configuration conf;
private ThreadLocal<Endpoint> endpoint = new ThreadLocal<>();

public SingleRdbVisitor(Replicator replicator, Configure configure, String uri, List<Long> db, List<String> regexs, List<DataType> types, boolean replace) throws Exception {
public SingleRdbVisitor(Replicator replicator, Configure configure, RedisURI uri, List<Long> db, List<String> regexs, List<DataType> types, boolean replace) throws Exception {
super(replicator, configure, db, regexs, types, replace);
this.uri = new RedisURI(uri);
this.uri = uri;
this.conf = configure.merge(this.uri);
this.replicator.addEventListener(new AsyncEventListener(this, replicator, configure));
}
Expand Down
146 changes: 100 additions & 46 deletions src/main/java/com/moilioncircle/redis/rdb/cli/net/Endpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.moilioncircle.redis.replicator.io.RedisInputStream;
import com.moilioncircle.redis.replicator.net.RedisSocketFactory;
import com.moilioncircle.redis.replicator.util.ByteBuilder;
import com.moilioncircle.redis.replicator.util.Strings;
import io.dropwizard.metrics5.Counter;
import io.dropwizard.metrics5.MetricRegistry;
import org.slf4j.Logger;
Expand All @@ -47,14 +46,14 @@
* @author Baoyi Chen
*/
public class Endpoint implements Closeable {

private static final Logger logger = LoggerFactory.getLogger(Endpoint.class);

private static final int BUFFER = 64 * 1024;
private static final byte[] AUTH = "auth".getBytes();
private static final byte[] PING = "ping".getBytes();
private static final byte[] SELECT = "select".getBytes();

private int db;
private int count = 0;
private final int pipe;
Expand All @@ -69,6 +68,10 @@ public class Endpoint implements Closeable {
private final Counter counterErr;
private final MetricRegistry registry;

public Endpoint(String host, int port) {
this(host, port, 0, 1, null, Configuration.defaultSetting());
}

public Endpoint(String host, int port, int db, int pipe, MetricRegistry registry, Configuration conf) {
this.host = host;
this.port = port;
Expand All @@ -80,26 +83,26 @@ public Endpoint(String host, int port, int db, int pipe, MetricRegistry registry
this.in = new RedisInputStream(this.socket.getInputStream(), BUFFER);
this.out = new BufferedOutputStream(this.socket.getOutputStream(), BUFFER);
if (conf.getAuthPassword() != null) {
String r = send(AUTH, conf.getAuthPassword().getBytes());
if (r != null) throw new RuntimeException(r);
RedisObject r = send(AUTH, conf.getAuthPassword().getBytes());
if (r != null && r.type.isError()) throw new RuntimeException(r.getString());
} else {
String r = send(PING);
if (r != null) throw new RuntimeException(r);
RedisObject r = send(PING);
if (r != null && r.type.isError()) throw new RuntimeException(r.getString());
}
String r = send(SELECT, String.valueOf(db).getBytes());
if (r != null) throw new RuntimeException(r);
RedisObject r = send(SELECT, String.valueOf(db).getBytes());
if (r != null && r.type.isError()) throw new RuntimeException(r.getString());
this.db = db;
String address = address(socket);
this.registry = registry;
String suc = "endpoint_suc_" + Thread.currentThread().getName();
String err = "endpoint_err_" + Thread.currentThread().getName();
this.counterSuc = registry.counter(name(suc, "address", address, "mtype", "suc"));
this.counterErr = registry.counter(name(err, "address", address, "mtype", "err"));
this.counterSuc = registry != null ? registry.counter(name(suc, "address", address, "mtype", "suc")) : null;
this.counterErr = registry != null ? registry.counter(name(err, "address", address, "mtype", "err")) : null;
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public String address(Socket socket) {
Objects.requireNonNull(socket);
InetSocketAddress la = (InetSocketAddress) socket.getLocalSocketAddress();
Expand All @@ -120,12 +123,12 @@ public String address(Socket socket) {
builder.append("]");
return builder.toString();
}

public int getDB() {
return db;
}

public String send(byte[] command, byte[]... ary) {
public RedisObject send(byte[] command, byte[]... ary) {
try {
emit(out, command, ary);
out.flush();
Expand All @@ -134,12 +137,12 @@ public String send(byte[] command, byte[]... ary) {
throw new RuntimeException(e);
}
}

public void select(boolean force, int db) {
batch(force, SELECT, String.valueOf(db).getBytes());
this.db = db;
}

public void batch(boolean force, byte[] command, byte[]... args) {
try {
emit(out, command, args);
Expand All @@ -150,18 +153,18 @@ public void batch(boolean force, byte[] command, byte[]... args) {
throw new RuntimeException(e);
}
}

public void flush() {
try {
if (count > 0) {
OutputStreams.flush(out);
for (int i = 0; i < count; i++) {
String r = parse();
if (r != null) {
logger.error(r);
counterErr.inc();
RedisObject r = parse();
if (r != null && r.type.isError()) {
logger.error(r.getString());
if (counterErr != null) counterErr.inc();
} else {
counterSuc.inc();
if (counterSuc != null) counterSuc.inc();
}
}
count = 0;
Expand All @@ -170,14 +173,14 @@ public void flush() {
throw new RuntimeException(e);
}
}

@Override
public void close() throws IOException {
Sockets.closeQuietly(in);
Sockets.closeQuietly(out);
Sockets.closeQuietly(socket);
}

public static void close(Endpoint endpoint) {
if (endpoint == null) return;
try {
Expand All @@ -186,20 +189,20 @@ public static void close(Endpoint endpoint) {
throw new RuntimeException(e);
}
}

public static void closeQuietly(Endpoint endpoint) {
if (endpoint == null) return;
try {
endpoint.close();
} catch (Throwable e) {
}
}

public static Endpoint valueOf(Endpoint endpoint) {
closeQuietly(endpoint);
return new Endpoint(endpoint.host, endpoint.port, endpoint.db, endpoint.pipe, endpoint.registry, endpoint.conf);
}

private void emit(OutputStream out, byte[] command, byte[]... ary) throws IOException {
out.write(STAR);
out.write(String.valueOf(ary.length + 1).getBytes());
Expand All @@ -222,8 +225,8 @@ private void emit(OutputStream out, byte[] command, byte[]... ary) throws IOExce
out.write('\n');
}
}

private String parse() throws IOException {
private RedisObject parse() throws IOException {
while (true) {
int c = in.read();
switch (c) {
Expand All @@ -241,23 +244,26 @@ private String parse() throws IOException {
}
}
long len = Long.parseLong(builder.toString());
if (len == -1) return null;
in.skip(len);
return null;
if (len == -1) return new RedisObject(RedisObject.Type.NULL, null);
return new RedisObject(RedisObject.Type.BULK, in.readBytes(len).first());
case COLON:
// RESP Integers
builder = ByteBuilder.allocate(128);
while (true) {
while (in.read() != '\r') {
while ((c = in.read()) != '\r') {
builder.put((byte) c);
}
if (in.read() == '\n') {
if ((c = in.read()) == '\n') {
break;
} else {
builder.put((byte) c);
}
}
// As integer
return null;
return new RedisObject(RedisObject.Type.NUMBER, Long.parseLong(builder.toString()));
case STAR:
// RESP Arrays
builder = ByteBuilder.allocate(32);
builder = ByteBuilder.allocate(128);
while (true) {
while ((c = in.read()) != '\r') {
builder.put((byte) c);
Expand All @@ -269,18 +275,24 @@ private String parse() throws IOException {
}
}
len = Long.parseLong(builder.toString());
if (len == -1) return null;
if (len == -1) return new RedisObject(RedisObject.Type.NULL, null);
RedisObject[] ary = new RedisObject[(int) len];
for (int i = 0; i < len; i++) {
parse();
RedisObject obj = parse();
ary[i] = obj;
}
return null;
return new RedisObject(RedisObject.Type.ARRAY, ary);
case PLUS:
// RESP Simple Strings
builder = ByteBuilder.allocate(128);
while (true) {
while (in.read() != '\r') {
while ((c = in.read()) != '\r') {
builder.put((byte) c);
}
if (in.read() == '\n') {
return null;
if ((c = in.read()) == '\n') {
return new RedisObject(RedisObject.Type.STRING, builder.array());
} else {
builder.put((byte) c);
}
}
case MINUS:
Expand All @@ -291,14 +303,56 @@ private String parse() throws IOException {
builder.put((byte) c);
}
if ((c = in.read()) == '\n') {
return Strings.toString(builder.array());
return new RedisObject(RedisObject.Type.ERR, builder.array());
} else {
builder.put((byte) c);
}
}
default:
throw new RuntimeException("expect [$,:,*,+,-] but: " + (char) c);


}
}
}

public static class RedisObject {
public Type type;
public Object object;

public RedisObject(Type type, Object object) {
this.type = type;
this.object = object;
}

public String getString() {
if (type.isString() || type.isError()) {
byte[] bytes = (byte[]) object;
return new String(bytes);
}
return null;
}

public enum Type {
ARRAY, NUMBER, STRING, BULK, ERR, NULL;

public boolean isString() {
return this == BULK || this == STRING;
}

public boolean isArray() {
return this == ARRAY;
}

public boolean isNumber() {
return this == NUMBER;
}

public boolean isError() {
return this == ERR;
}

public boolean isNull() {
return this == NULL;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public Endpoints(List<String> lines, int pipe, MetricRegistry registry, Configur
};
new NodeConfParser(mapper).parse(lines, set, endpoints);
}

public String send(byte[] command, byte[]... args) {
public Endpoint.RedisObject send(byte[] command, byte[]... args) {
short slot = slot(args[0]);
return endpoints.get(slot).send(command, args);
}
Expand Down

0 comments on commit 1b46529

Please sign in to comment.