diff --git a/src/main/java/org/fluentd/logger/FluentLogger.java b/src/main/java/org/fluentd/logger/FluentLogger.java index 45bc41b..5f8d278 100644 --- a/src/main/java/org/fluentd/logger/FluentLogger.java +++ b/src/main/java/org/fluentd/logger/FluentLogger.java @@ -82,10 +82,18 @@ public boolean log(String tag, String key, Object value, long timestamp) { } public boolean log(String tag, Map data) { - return log(tag, data, 0); + return log(tag, (Object)data, 0); } public boolean log(String tag, Map data, long timestamp) { + return log(tag, (Object)data, timestamp); + } + + public boolean log(String tag, Object data) { + return log(tag, data, 0); + } + + public boolean log(String tag, Object data, long timestamp) { String concatTag = null; if (tagPrefix == null || tagPrefix.length() == 0) { concatTag = tag; diff --git a/src/main/java/org/fluentd/logger/sender/DefaultEventTemplate.java b/src/main/java/org/fluentd/logger/sender/DefaultEventTemplate.java new file mode 100644 index 0000000..9c24542 --- /dev/null +++ b/src/main/java/org/fluentd/logger/sender/DefaultEventTemplate.java @@ -0,0 +1,12 @@ +package org.fluentd.logger.sender; + +import java.io.IOException; + +import org.msgpack.packer.Packer; + +public class DefaultEventTemplate extends EventTemplate { + @Override + protected void doWriteData(Packer pk, Object data, boolean required) throws IOException { + pk.write(data); + } +} diff --git a/src/main/java/org/fluentd/logger/sender/Event.java b/src/main/java/org/fluentd/logger/sender/Event.java index 3eb2425..c107109 100644 --- a/src/main/java/org/fluentd/logger/sender/Event.java +++ b/src/main/java/org/fluentd/logger/sender/Event.java @@ -17,30 +17,22 @@ // package org.fluentd.logger.sender; -import java.io.IOException; -import java.util.Map; - -import org.msgpack.MessageTypeException; -import org.msgpack.packer.Packer; -import org.msgpack.template.AbstractTemplate; -import org.msgpack.template.Templates; -import org.msgpack.unpacker.Unpacker; public class Event { public String tag; public long timestamp; - public Map data; + public Object data; public Event() { } - public Event(String tag, Map data) { + public Event(String tag, Object data) { this(tag, System.currentTimeMillis() / 1000, data); } - public Event(String tag, long timestamp, Map data) { + public Event(String tag, long timestamp, Object data) { this.tag = tag; this.timestamp = timestamp; this.data = data; @@ -51,42 +43,4 @@ public String toString() { return String.format("Event{tag=%s,timestamp=%d,data=%s}", tag, timestamp, data.toString()); } - - public static class EventTemplate extends AbstractTemplate { - public static EventTemplate INSTANCE = new EventTemplate(); - - public void write(Packer pk, Event v, boolean required) throws IOException { - if (v == null) { - if (required) { - throw new MessageTypeException("Attempted to write null"); - } - pk.writeNil(); - return; - } - - pk.writeArrayBegin(3); - { - Templates.TString.write(pk, v.tag, required); - Templates.TLong.write(pk, v.timestamp, required); - pk.writeMapBegin(v.data.size()); - { - for (Map.Entry entry : v.data.entrySet()) { - Templates.TString.write(pk, entry.getKey(), required); - try { - pk.write(entry.getValue()); - } catch (MessageTypeException e) { - String val = entry.getValue().toString(); - Templates.TString.write(pk, val, required); - } - } - } - pk.writeMapEnd(); - } - pk.writeArrayEnd(); - } - - public Event read(Unpacker u, Event to, boolean required) throws IOException { - throw new UnsupportedOperationException("Don't need the operation"); - } - } } diff --git a/src/main/java/org/fluentd/logger/sender/EventTemplate.java b/src/main/java/org/fluentd/logger/sender/EventTemplate.java new file mode 100644 index 0000000..44dd7a6 --- /dev/null +++ b/src/main/java/org/fluentd/logger/sender/EventTemplate.java @@ -0,0 +1,38 @@ +package org.fluentd.logger.sender; + +import java.io.IOException; + +import org.msgpack.MessageTypeException; +import org.msgpack.packer.Packer; +import org.msgpack.template.AbstractTemplate; +import org.msgpack.template.Templates; +import org.msgpack.unpacker.Unpacker; + +public abstract class EventTemplate extends AbstractTemplate { + public static EventTemplate INSTANCE = new DefaultEventTemplate(); + + public void write(Packer pk, Event v, boolean required) throws IOException { + if (v == null) { + if (required) { + throw new MessageTypeException("Attempted to write null"); + } + pk.writeNil(); + return; + } + + pk.writeArrayBegin(3); + { + Templates.TString.write(pk, v.tag, required); + Templates.TLong.write(pk, v.timestamp, required); + doWriteData(pk, v.data, required); + } + pk.writeArrayEnd(); + } + + protected abstract void doWriteData(Packer pk, Object data, boolean required) + throws IOException; + + public Event read(Unpacker u, Event to, boolean required) throws IOException { + throw new UnsupportedOperationException("Don't need the operation"); + } +} diff --git a/src/main/java/org/fluentd/logger/sender/MapStyleEventTemplate.java b/src/main/java/org/fluentd/logger/sender/MapStyleEventTemplate.java new file mode 100644 index 0000000..151d2a1 --- /dev/null +++ b/src/main/java/org/fluentd/logger/sender/MapStyleEventTemplate.java @@ -0,0 +1,75 @@ +package org.fluentd.logger.sender; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.msgpack.MessageTypeException; +import org.msgpack.packer.Packer; +import org.msgpack.template.Templates; + +public class MapStyleEventTemplate extends EventTemplate { + @Override + protected void doWriteData(Packer pk, Object data, boolean required) throws IOException { + if(data instanceof Map){ + writeMap(pk, (Map)data, required); + } else{ + try{ + pk.write(data); + } catch (MessageTypeException e) { + writeObj(pk, data, required); + } + } + } + + private void writeMap(Packer pk, Map map, boolean required) throws IOException { + pk.writeMapBegin(map.size()); + { + for (Map.Entry entry : map.entrySet()) { + Templates.TString.write(pk, entry.getKey().toString(), required); + Object value = entry.getValue(); + if(value instanceof Map){ + writeMap(pk, (Map)value, required); + } else{ + try { + pk.write(entry.getValue()); + } catch (MessageTypeException e) { + writeObj(pk, entry.getValue(), required); + } + } + } + } + pk.writeMapEnd(); + } + + private void writeObj(Packer pk, Object data, boolean required) throws IOException { + Map map = new LinkedHashMap(); + Class clazz = data.getClass(); + while(!clazz.equals(Object.class)){ + for(Method m : clazz.getDeclaredMethods()){ + if(m.getDeclaringClass().equals(Object.class)) continue; + if(m.getParameterTypes().length != 0) continue; + String name = null; + if(m.getName().startsWith("get")){ + name = m.getName().substring(3); + } else if(m.getName().startsWith("is") && m.getReturnType().equals(boolean.class)){ + name = m.getName().substring(2); + } else{ + continue; + } + if(name.length() == 0) continue; + name = name.substring(0, 1).toLowerCase() + (name.length() == 1 ? "" : name.substring(1)); + try { + map.put(name, m.invoke(data)); + } catch (IllegalArgumentException e) { + } catch (IllegalAccessException e) { + } catch (InvocationTargetException e) { + } + } + clazz = clazz.getSuperclass(); + } + writeMap(pk, map, required); + } +} diff --git a/src/main/java/org/fluentd/logger/sender/NullSender.java b/src/main/java/org/fluentd/logger/sender/NullSender.java index de33679..4c5671c 100644 --- a/src/main/java/org/fluentd/logger/sender/NullSender.java +++ b/src/main/java/org/fluentd/logger/sender/NullSender.java @@ -17,7 +17,6 @@ // package org.fluentd.logger.sender; -import java.util.Map; public class NullSender implements Sender { @@ -25,12 +24,12 @@ public NullSender(String host, int port, int timeout, int bufferCapacity) { } @Override - public boolean emit(String tag, Map data) { + public boolean emit(String tag, Object data) { return emit(tag, System.currentTimeMillis() / 1000, data); } @Override - public boolean emit(String tag, long timestamp, Map data) { + public boolean emit(String tag, long timestamp, Object data) { return true; } diff --git a/src/main/java/org/fluentd/logger/sender/RawSocketSender.java b/src/main/java/org/fluentd/logger/sender/RawSocketSender.java index ba22fef..20b48be 100644 --- a/src/main/java/org/fluentd/logger/sender/RawSocketSender.java +++ b/src/main/java/org/fluentd/logger/sender/RawSocketSender.java @@ -23,7 +23,6 @@ import java.net.Socket; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.util.Map; import java.util.logging.Level; import org.msgpack.MessagePack; @@ -63,7 +62,7 @@ public RawSocketSender(String host, int port, int timeout, int bufferCapacity) { public RawSocketSender(String host, int port, int timeout, int bufferCapacity, Reconnector reconnector) { msgpack = new MessagePack(); - msgpack.register(Event.class, Event.EventTemplate.INSTANCE); + msgpack.register(Event.class, EventTemplate.INSTANCE); pendings = ByteBuffer.allocate(bufferCapacity); server = new InetSocketAddress(host, port); this.reconnector = reconnector; @@ -125,11 +124,11 @@ public void close() { } } - public boolean emit(String tag, Map data) { + public boolean emit(String tag, Object data) { return emit(tag, System.currentTimeMillis() / 1000, data); } - public boolean emit(String tag, long timestamp, Map data) { + public boolean emit(String tag, long timestamp, Object data) { return emit(new Event(tag, timestamp, data)); } diff --git a/src/main/java/org/fluentd/logger/sender/Sender.java b/src/main/java/org/fluentd/logger/sender/Sender.java index 9a6dc42..8a56250 100644 --- a/src/main/java/org/fluentd/logger/sender/Sender.java +++ b/src/main/java/org/fluentd/logger/sender/Sender.java @@ -17,12 +17,10 @@ // package org.fluentd.logger.sender; -import java.util.Map; - public interface Sender { - boolean emit(String tag, Map data); + boolean emit(String tag, Object data); - boolean emit(String tag, long timestamp, Map data); + boolean emit(String tag, long timestamp, Object data); void flush(); diff --git a/src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java b/src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java index 8130ae2..f60691d 100644 --- a/src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java +++ b/src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java @@ -67,14 +67,14 @@ public void process(MessagePack msgpack, Socket socket) throws IOException { { Event e = elist.get(0); assertEquals("tag.label1", e.tag); - assertEquals("t1v1", e.data.get("t1k1")); - assertEquals("t1v2", e.data.get("t1k2")); + assertEquals("t1v1", ((Map)e.data).get("t1k1")); + assertEquals("t1v2", ((Map)e.data).get("t1k2")); } { Event e = elist.get(1); assertEquals("tag.label2", e.tag); - assertEquals("t2v1", e.data.get("t2k1")); - assertEquals("t2v2", e.data.get("t2k2")); + assertEquals("t2v1", ((Map)e.data).get("t2k1")); + assertEquals("t2v2", ((Map)e.data).get("t2k2")); } } @@ -205,4 +205,153 @@ public void process(MessagePack msgpack, Socket socket) throws IOException { assertEquals(counts[0], elists[0].size()); assertEquals(counts[1], elists[1].size()); } + + @Test + public void testNormal04_SOMapInSOMap() throws Exception { + List elist = new ArrayList(); + int port = 25225; + MockFluentd mock = createMock(elist, port); + try{ + // start senders + Sender sender = new RawSocketSender("localhost", port); + Map data = new HashMap(); + data.put("t1k1", "t1v1"); + data.put("t1k2", "t1v2"); + Map data2 = new HashMap(); + data2.put("t2k1", "t2v1"); + data2.put("t2k2", "t2v2"); + data.put("data2", data2); + sender.emit("tag.label1", data); + // close sender sockets + sender.close(); + } finally{ + // close mock server sockets + mock.close(); + } + + // wait for unpacking event data on fluentd + Thread.sleep(1000); + + // check data + assertEquals(1, elist.size()); + { + Event e = elist.get(0); + assertEquals("tag.label1", e.tag); + assertEquals("t1v1", ((Map)e.data).get("t1k1")); + assertEquals("t1v2", ((Map)e.data).get("t1k2")); + Map data2 = (Map)((Map)e.data).get("data2"); + assertEquals("t2v1", data2.get("t2k1")); + assertEquals("t2v2", data2.get("t2k2")); + } + } + + @Test + public void testNormal05_WrappersAndString() throws Exception { + List elist = new ArrayList(); + int port = 25225; + MockFluentd mock = createMock(elist, port); + try{ + // start senders + Sender sender = new RawSocketSender("localhost", port); + sender.emit("tag.label1", (short)1); + sender.emit("tag.label1", 2); + sender.emit("tag.label1", 3L); + sender.emit("tag.label1", 4.4f); + sender.emit("tag.label1", 5.5); + sender.emit("tag.label1", true); + sender.emit("tag.label1", 'A'); + sender.emit("tag.label1", "hello"); + // close sender sockets + sender.close(); + } finally{ + // close mock server sockets + mock.close(); + } + + // wait for unpacking event data on fluentd + Thread.sleep(1000); + + // check data + assertEquals(8, elist.size()); + { + int i = 0; + assertEquals((long)1, elist.get(i++).data); + assertEquals((long)2, elist.get(i++).data); + assertEquals((long)3, elist.get(i++).data); + assertEquals(4.4, (Double)elist.get(i++).data, 0.001); + assertEquals(5.5, (Double)elist.get(i++).data, 0.001); + assertEquals(true, elist.get(i++).data); + assertEquals((long)'A', elist.get(i++).data); + assertEquals("hello", elist.get(i++).data); + } + } + + public static class Msg{ + public Msg() { + } + public Msg(String name, int age, boolean live) { + this.name = name; + this.age = age; + this.live = live; + } + public int getAge() { + return age; + } + public String getName() { + return name; + } + public boolean isLive() { + return live; + } + + private String name; + private int age; + private boolean live; + } + @Test + public void testNormal06_Object() throws Exception { + List elist = new ArrayList(); + int port = 25225; + EventTemplate.INSTANCE = new MapStyleEventTemplate(); + MockFluentd mock = createMock(elist, port); + try{ + Sender sender = new RawSocketSender("localhost", port); + sender.emit("tag.label1", new Msg("suzuki", 30, true)); + sender.close(); + } finally{ + mock.close(); + } + + Thread.sleep(1000); + + // check data + assertEquals(1, elist.size()); + { + Map m = (Map)elist.get(0).data; + assertEquals("suzuki", m.get("name")); + assertEquals((long)30, m.get("age")); + assertEquals(true, m.get("live")); + } + } + + private MockFluentd createMock(final List elist, int port) throws IOException{ + // start mock fluentd + MockFluentd fluentd = new MockFluentd(port, new MockFluentd.MockProcess() { + public void process(MessagePack msgpack, Socket socket) throws IOException { + BufferedInputStream in = new BufferedInputStream(socket.getInputStream()); + try { + Unpacker unpacker = msgpack.createUnpacker(in); + while (true) { + Event e = unpacker.read(Event.class); + elist.add(e); + } + //socket.close(); + } catch (EOFException e) { + // ignore + } + } + }); + fluentd.start(); + return fluentd; + } } diff --git a/src/test/java/org/fluentd/logger/sender/TestSenderFluentdDownOperation.java b/src/test/java/org/fluentd/logger/sender/TestSenderFluentdDownOperation.java index abd33c7..1272b58 100644 --- a/src/test/java/org/fluentd/logger/sender/TestSenderFluentdDownOperation.java +++ b/src/test/java/org/fluentd/logger/sender/TestSenderFluentdDownOperation.java @@ -7,10 +7,7 @@ import java.util.HashMap; import java.util.Map; -import org.fluentd.logger.sender.RawSocketSender; -import org.fluentd.logger.sender.Sender; import org.fluentd.logger.util.MockFluentd; -import org.fluentd.logger.util.MockFluentd.MockProcess; import org.junit.Ignore; import org.junit.Test; import org.msgpack.MessagePack; @@ -25,7 +22,7 @@ public class TestSenderFluentdDownOperation { public void testFluentdDownOperation01() throws Exception { int port = 25225; MessagePack msgpack = new MessagePack(); - msgpack.register(Event.class, Event.EventTemplate.INSTANCE); + msgpack.register(Event.class, EventTemplate.INSTANCE); BufferPacker packer = msgpack.createBufferPacker(); long timestamp = System.currentTimeMillis() / 1000; @@ -60,7 +57,7 @@ public void testFluentdDownOperation01() throws Exception { public void testFluentdDownOperation02()throws Exception { int port = 25225; MessagePack msgpack = new MessagePack(); - msgpack.register(Event.class, Event.EventTemplate.INSTANCE); + msgpack.register(Event.class, EventTemplate.INSTANCE); BufferPacker packer = msgpack.createBufferPacker(); long timestamp = System.currentTimeMillis(); diff --git a/src/test/java/org/fluentd/logger/util/MockFluentd.java b/src/test/java/org/fluentd/logger/util/MockFluentd.java index cf39540..102ee1d 100644 --- a/src/test/java/org/fluentd/logger/util/MockFluentd.java +++ b/src/test/java/org/fluentd/logger/util/MockFluentd.java @@ -3,12 +3,13 @@ import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; -import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import org.fluentd.logger.sender.DefaultEventTemplate; import org.fluentd.logger.sender.Event; import org.msgpack.MessagePack; -import org.msgpack.packer.Packer; import org.msgpack.template.Templates; import org.msgpack.type.Value; import org.msgpack.unpacker.Unpacker; @@ -19,13 +20,8 @@ public static interface MockProcess { public void process(MessagePack msgpack, Socket socket) throws IOException; } - public static class MockEventTemplate extends Event.EventTemplate { + public static class MockEventTemplate extends DefaultEventTemplate { public static MockEventTemplate INSTANCE = new MockEventTemplate(); - - public void write(Packer pk, Event v, boolean required) throws IOException { - throw new UnsupportedOperationException("don't need operation"); - } - public Event read(Unpacker u, Event to, boolean required) throws IOException { if (!required && u.trySkipNil()) { return null; @@ -36,16 +32,7 @@ public Event read(Unpacker u, Event to, boolean required) throws IOException { { to.tag = Templates.TString.read(u, null, required); to.timestamp = Templates.TLong.read(u, null, required); - int size = u.readMapBegin(); - to.data = new HashMap(size); - { - for (int i = 0; i < size; i++) { - String key = (String) toObject(u, u.readValue()); - Object value = toObject(u, u.readValue()); - to.data.put(key, value); - } - } - u.readMapEnd(); + to.data = toObject(u, u.readValue()); } u.readArrayEnd(); return to; @@ -64,7 +51,14 @@ private static Object toObject(Unpacker u, Value v) { } else if (v.isIntegerValue()) { return v.asIntegerValue().getLong(); // long only } else if (v.isMapValue()) { - throw new UnsupportedOperationException(); + Map map = new LinkedHashMap(); + Value[] vals = v.asMapValue().asMapValue().getKeyValueArray(); + for(int i = 0; i < (vals.length / 2); i++){ + String key = vals[i * 2].asRawValue().getString(); + Object value = toObject(u, vals[i * 2 + 1]); + map.put(key, value); + } + return map; } else if (v.isArrayValue()) { throw new UnsupportedOperationException(); } else {