diff --git a/client/src/main/java/cn/edu/tsinghua/iginx/client/IginxClient.java b/client/src/main/java/cn/edu/tsinghua/iginx/client/IginxClient.java index f64c00485..169a22f33 100644 --- a/client/src/main/java/cn/edu/tsinghua/iginx/client/IginxClient.java +++ b/client/src/main/java/cn/edu/tsinghua/iginx/client/IginxClient.java @@ -225,7 +225,7 @@ private static void processCommand(String command) { try { List paths = getTimeseries(); - Set timestamps = new HashSet(); + Set timestamps = new HashSet<>(); for (int i = 0; i < paths.size(); i += MAX_GETDATA_NUM) { List ins = new ArrayList<>(); diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/combine/querydata/QueryDataSetCombiner.java b/core/src/main/java/cn/edu/tsinghua/iginx/combine/querydata/QueryDataSetCombiner.java index 63a7ba97b..0667e66dd 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/combine/querydata/QueryDataSetCombiner.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/combine/querydata/QueryDataSetCombiner.java @@ -20,6 +20,7 @@ import cn.edu.tsinghua.iginx.exceptions.ExecutionException; import cn.edu.tsinghua.iginx.exceptions.StatusCode; +import cn.edu.tsinghua.iginx.metadata.TagTools; import cn.edu.tsinghua.iginx.query.result.QueryDataPlanExecuteResult; import cn.edu.tsinghua.iginx.thrift.DataType; import cn.edu.tsinghua.iginx.thrift.QueryDataResp; @@ -27,6 +28,7 @@ import cn.edu.tsinghua.iginx.utils.Bitmap; import cn.edu.tsinghua.iginx.utils.ByteUtils; import cn.edu.tsinghua.iginx.utils.CheckedFunction; +import cn.edu.tsinghua.iginx.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -164,7 +166,9 @@ public void combineResult(QueryDataResp resp, List p } } } - resp.setPaths(columnNameList); + List>> pathAndTagsList = TagTools.splitTags(columnNameList); + resp.setPaths(pathAndTagsList.stream().map(e -> e.k).collect(Collectors.toList())); + resp.setTagsList(pathAndTagsList.stream().map(e -> e.v).collect(Collectors.toList())); resp.setDataTypeList(columnTypeList); resp.setQueryDataSet(new QueryDataSet(ByteUtils.getColumnByteBuffer(timestamps.toArray(), DataType.LONG), valuesList, bitmapList)); diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/conf/Constants.java b/core/src/main/java/cn/edu/tsinghua/iginx/conf/Constants.java index 55eee7480..55cf05ec8 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/conf/Constants.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/conf/Constants.java @@ -52,5 +52,11 @@ public class Constants { public static final String SCHEMA_MAPPING_PREFIX = "/schema"; + public static final String TAG_PREFIX = "@"; + + public static final String VALUE_PREFIX = "$"; + + public static final String DOT = "."; + } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/TagTools.java b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/TagTools.java new file mode 100644 index 000000000..31a72ccff --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/TagTools.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package cn.edu.tsinghua.iginx.metadata; + +import cn.edu.tsinghua.iginx.conf.Constants; +import cn.edu.tsinghua.iginx.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class TagTools { + + private static final Logger logger = LoggerFactory.getLogger(TagTools.class); + + public boolean hasTags(String path) { + return path.contains(Constants.TAG_PREFIX); + } + + public static Pair> splitTags(String pathWithTags) { + int splitIndex = pathWithTags.indexOf(Constants.TAG_PREFIX); + if (splitIndex == -1) { // 没有 tag + return new Pair<>(pathWithTags, Collections.emptyMap()); + } + String path = pathWithTags.substring(0, splitIndex - 1); + List tagKList = new ArrayList<>(); + List tagVList = new ArrayList<>(); + + String[] tagKVs = pathWithTags.substring(splitIndex).split("\\."); + for (String tagKOrV: tagKVs) { + if (tagKOrV.startsWith(Constants.TAG_PREFIX)) { + tagKList.add(tagKOrV.substring(Constants.TAG_PREFIX.length())); + } else if (tagKOrV.startsWith(Constants.VALUE_PREFIX)) { + tagVList.add(tagKOrV.substring(Constants.VALUE_PREFIX.length())); + } else { + throw new IllegalArgumentException("unexpected path with tags: " + pathWithTags); + } + } + if (tagKList.size() != tagVList.size()) { + throw new IllegalArgumentException("unexpected path with tags: " + pathWithTags); + } + Map tags = new HashMap<>(); + for (int i = 0; i < tagKList.size(); i++) { + tags.put(tagKList.get(i), tagVList.get(i)); + } + return new Pair<>(path, tags); + } + + public static List>> splitTags(List pathsWithTags) { + if (pathsWithTags == null) { + return Collections.emptyList(); + } + return pathsWithTags.stream().map(TagTools::splitTags).collect(Collectors.toList()); + } + + public static List concatTags(List paths, List> tagsList) { + if (tagsList == null || tagsList.size() == 0) { + return paths; + } + List pathsWithTags = new ArrayList<>(); + for (int i = 0; i < paths.size(); i++) { + String path = paths.get(i); + Map tags = tagsList.get(i); + if (tags == null || tags.size() == 0) { + pathsWithTags.add(path); + continue; + } + + List> tagList = tags.entrySet().stream().sorted(Map.Entry.comparingByKey()).collect(Collectors.toList()); + StringBuilder tagKBuilder = new StringBuilder(); + StringBuilder tagVBuilder = new StringBuilder(); + + for (Map.Entry tag : tagList) { + tagKBuilder.append(Constants.DOT); + tagKBuilder.append(Constants.TAG_PREFIX); + tagKBuilder.append(tag.getKey()); + + tagVBuilder.append(Constants.DOT); + tagVBuilder.append(Constants.VALUE_PREFIX); + tagVBuilder.append(tag.getValue()); + } + pathsWithTags.add(path + tagKBuilder.toString() + tagVBuilder.toString()); + } + return pathsWithTags; + } + + public static String toString(String path, Map tags) { + if (tags == null || tags.isEmpty()) { + return path; + } + StringBuilder builder = new StringBuilder(path); + List> tagList = tags.entrySet().stream().sorted(Map.Entry.comparingByKey()).collect(Collectors.toList()); + for (int i = 0; i < tagList.size(); i++) { + Map.Entry tag = tagList.get(i); + if (i == 0) { + builder.append("{"); + } else { + builder.append(", "); + } + builder.append(tag.getKey()); + builder.append("="); + builder.append(tag.getValue()); + } + builder.append('}'); + return builder.toString(); + } + + public static void main(String[] args) { + List paths = Arrays.asList("cpu.usage", "memory.usage", "java"); + List> tagsList = new ArrayList<>(); + + Map tags1 = new HashMap<>(); + tags1.put("host", "1"); + tagsList.add(tags1); + + Map tags2 = new HashMap<>(); + tags2.put("host", "1"); + tags2.put("type", "ddr4"); + tags2.put("producer", "Samsung"); + tagsList.add(tags2); + + Map tags3 = new HashMap<>(); + tagsList.add(tags3); + + List pathsWithTags = concatTags(paths, tagsList); + for (String pathWithTags: pathsWithTags) { + System.out.println(pathWithTags); + Pair> pathAndTags = splitTags(pathWithTags); + System.out.println(pathAndTags.k); + System.out.println(pathAndTags.v); + System.out.println(toString(pathAndTags.k, pathAndTags.v)); + } + System.out.println(); + } + +} diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/plan/InsertColumnRecordsPlan.java b/core/src/main/java/cn/edu/tsinghua/iginx/plan/InsertColumnRecordsPlan.java index b584aedba..be5b7b86d 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/plan/InsertColumnRecordsPlan.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/plan/InsertColumnRecordsPlan.java @@ -39,14 +39,14 @@ public class InsertColumnRecordsPlan extends InsertRecordsPlan { private static final Logger logger = LoggerFactory.getLogger(InsertColumnRecordsPlan.class); public InsertColumnRecordsPlan(List paths, long[] timestamps, Object[] valuesList, List bitmapList, - List dataTypeList, List> attributesList, StorageUnitMeta storageUnit) { - super(paths, timestamps, valuesList, bitmapList, dataTypeList, attributesList, storageUnit); + List dataTypeList, List> tagsList, StorageUnitMeta storageUnit) { + super(paths, timestamps, valuesList, bitmapList, dataTypeList, tagsList, storageUnit); this.setIginxPlanType(INSERT_COLUMN_RECORDS); } public InsertColumnRecordsPlan(List paths, long[] timestamps, Object[] valuesList, List bitmapList, - List dataTypeList, List> attributesList) { - this(paths, timestamps, valuesList, bitmapList, dataTypeList, attributesList, null); + List dataTypeList, List> tagsList) { + this(paths, timestamps, valuesList, bitmapList, dataTypeList, tagsList, null); } public Pair> getValuesAndBitmapsByIndexes(Pair rowIndexes, TimeSeriesInterval interval) { diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/plan/InsertRecordsPlan.java b/core/src/main/java/cn/edu/tsinghua/iginx/plan/InsertRecordsPlan.java index a1449e1c7..e9c2c82be 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/plan/InsertRecordsPlan.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/plan/InsertRecordsPlan.java @@ -18,6 +18,7 @@ */ package cn.edu.tsinghua.iginx.plan; +import cn.edu.tsinghua.iginx.metadata.TagTools; import cn.edu.tsinghua.iginx.metadata.entity.StorageUnitMeta; import cn.edu.tsinghua.iginx.metadata.entity.TimeInterval; import cn.edu.tsinghua.iginx.metadata.entity.TimeSeriesInterval; @@ -49,17 +50,17 @@ public abstract class InsertRecordsPlan extends DataPlan { private List dataTypeList; - private List> attributesList; + private List> tagsList; protected InsertRecordsPlan(List paths, long[] timestamps, Object[] valuesList, List bitmapList, - List dataTypeList, List> attributesList, StorageUnitMeta storageUnit) { - super(false, paths, timestamps[0], timestamps[timestamps.length - 1], storageUnit); + List dataTypeList, List> tagsList, StorageUnitMeta storageUnit) { + super(false, TagTools.concatTags(paths, tagsList), timestamps[0], timestamps[timestamps.length - 1], storageUnit); this.setIginxPlanType(INSERT_RECORDS); this.timestamps = timestamps; this.valuesList = valuesList; this.bitmapList = bitmapList; this.dataTypeList = dataTypeList; - this.attributesList = attributesList; + this.tagsList = tagsList; } public long getTimestamp(int index) { @@ -151,19 +152,19 @@ public List getDataTypeListByInterval(TimeSeriesInterval interval) { } public Map getAttributes(int index) { - if (attributesList == null || attributesList.isEmpty()) { + if (tagsList == null || tagsList.isEmpty()) { logger.info("There are no attributes in the InsertRecordsPlan."); return null; } - if (index < 0 || index >= attributesList.size()) { + if (index < 0 || index >= tagsList.size()) { logger.error("The given index {} is out of bounds.", index); return null; } - return attributesList.get(index); + return tagsList.get(index); } - public List> getAttributesByInterval(TimeSeriesInterval interval) { - if (attributesList == null || attributesList.isEmpty()) { + public List> getTagsByInterval(TimeSeriesInterval interval) { + if (tagsList == null || tagsList.isEmpty()) { logger.info("There are no attributes in the InsertRecordsPlan."); return null; } @@ -177,7 +178,7 @@ public List> getAttributesByInterval(TimeSeriesInterval inte endIndex = i; } } - return attributesList.subList(startIndex, endIndex + 1); + return tagsList.subList(startIndex, endIndex + 1); } } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/plan/InsertRowRecordsPlan.java b/core/src/main/java/cn/edu/tsinghua/iginx/plan/InsertRowRecordsPlan.java index e4e10c70d..6713b3c22 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/plan/InsertRowRecordsPlan.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/plan/InsertRowRecordsPlan.java @@ -39,14 +39,14 @@ public class InsertRowRecordsPlan extends InsertRecordsPlan { private static final Logger logger = LoggerFactory.getLogger(InsertRowRecordsPlan.class); public InsertRowRecordsPlan(List paths, long[] timestamps, Object[] valuesList, List bitmapList, - List dataTypeList, List> attributesList, StorageUnitMeta storageUnit) { - super(paths, timestamps, valuesList, bitmapList, dataTypeList, attributesList, storageUnit); + List dataTypeList, List> tagsList, StorageUnitMeta storageUnit) { + super(paths, timestamps, valuesList, bitmapList, dataTypeList, tagsList, storageUnit); this.setIginxPlanType(INSERT_ROW_RECORDS); } public InsertRowRecordsPlan(List paths, long[] timestamps, Object[] valuesList, List bitmapList, - List dataTypeList, List> attributesList) { - this(paths, timestamps, valuesList, bitmapList, dataTypeList, attributesList, null); + List dataTypeList, List> tagsList) { + this(paths, timestamps, valuesList, bitmapList, dataTypeList, tagsList, null); } public Pair> getValuesAndBitmapsByIndexes(Pair rowIndexes, TimeSeriesInterval interval) { diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/plan/QueryDataPlan.java b/core/src/main/java/cn/edu/tsinghua/iginx/plan/QueryDataPlan.java index 387b2bb09..5b9a1a982 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/plan/QueryDataPlan.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/plan/QueryDataPlan.java @@ -18,6 +18,7 @@ */ package cn.edu.tsinghua.iginx.plan; +import cn.edu.tsinghua.iginx.metadata.TagTools; import cn.edu.tsinghua.iginx.metadata.entity.StorageUnitMeta; import cn.edu.tsinghua.iginx.metadata.entity.TimeSeriesInterval; import org.slf4j.Logger; @@ -25,6 +26,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import static cn.edu.tsinghua.iginx.plan.IginxPlan.IginxPlanType.QUERY_DATA; @@ -32,9 +34,10 @@ public class QueryDataPlan extends DataPlan { private static final Logger logger = LoggerFactory.getLogger(QueryDataPlan.class); - public QueryDataPlan(List paths, long startTime, long endTime) { - super(true, paths, startTime, endTime, null); + public QueryDataPlan(List paths, List> tagsList, long startTime, long endTime) { + super(true, TagTools.concatTags(paths, tagsList), startTime, endTime, null); this.setIginxPlanType(QUERY_DATA); + paths = this.getPaths(); boolean isStartPrefix = paths.get(0).contains("*"); String startTimeSeries = trimPath(paths.get(0)); boolean isEndPrefix = paths.get(getPathsNum() - 1).contains("*"); @@ -60,8 +63,8 @@ public QueryDataPlan(List paths, long startTime, long endTime) { this.setTsInterval(new TimeSeriesInterval(startTimeSeries, endTimeSeries)); } - public QueryDataPlan(List paths, long startTime, long endTime, StorageUnitMeta storageUnit) { - this(paths, startTime, endTime); + public QueryDataPlan(List paths, List> tagsList, long startTime, long endTime, StorageUnitMeta storageUnit) { + this(paths, tagsList, startTime, endTime); this.setStorageUnit(storageUnit); this.setSync(true); } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/rest/RestSession.java b/core/src/main/java/cn/edu/tsinghua/iginx/rest/RestSession.java index 0e2830342..93e985ba0 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/rest/RestSession.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/rest/RestSession.java @@ -264,7 +264,7 @@ public void insertColumnRecords(List paths, long[] timestamps, Object[] req.setValuesList(valueBufferList); req.setBitmapList(bitmapBufferList); req.setDataTypeList(dataTypeList); - req.setAttributesList(attributesList); + req.setTagsList(attributesList); Status status; do { @@ -333,7 +333,7 @@ public void insertRowRecords(List paths, long[] timestamps, Object[] val req.setValuesList(valueBufferList); req.setBitmapList(bitmapBufferList); req.setDataTypeList(dataTypeList); - req.setAttributesList(attributesList); + req.setTagsList(attributesList); Status status; do { diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/session/Session.java b/core/src/main/java/cn/edu/tsinghua/iginx/session/Session.java index b30830d57..c4ae03275 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/session/Session.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/session/Session.java @@ -370,7 +370,7 @@ public void deleteColumns(List paths) throws SessionException, Execution } public void insertColumnRecords(List paths, long[] timestamps, Object[] valuesList, - List dataTypeList, List> attributesList) throws SessionException, ExecutionException { + List dataTypeList, List> tagsList) throws SessionException, ExecutionException { if (paths.isEmpty() || timestamps.length == 0 || valuesList.length == 0 || dataTypeList.isEmpty()) { logger.error("Invalid insert request!"); return; @@ -379,7 +379,7 @@ public void insertColumnRecords(List paths, long[] timestamps, Object[] logger.error("The sizes of paths, valuesList and dataTypeList should be equal."); return; } - if (attributesList != null && paths.size() != attributesList.size()) { + if (tagsList != null && paths.size() != tagsList.size()) { logger.error("The sizes of paths, valuesList, dataTypeList and attributesList should be equal."); return; } @@ -406,14 +406,14 @@ public void insertColumnRecords(List paths, long[] timestamps, Object[] Collections.sort(paths); Object[] sortedValuesList = new Object[valuesList.length]; List sortedDataTypeList = new ArrayList<>(); - List> sortedAttributesList = new ArrayList<>(); + List> sortedTagsList = new ArrayList<>(); for (int i = 0; i < valuesList.length; i++) { sortedValuesList[i] = valuesList[index[i]]; sortedDataTypeList.add(dataTypeList.get(index[i])); } - if (attributesList != null) { + if (tagsList != null) { for (Integer i : index) { - sortedAttributesList.add(attributesList.get(i)); + sortedTagsList.add(tagsList.get(i)); } } @@ -442,7 +442,7 @@ public void insertColumnRecords(List paths, long[] timestamps, Object[] req.setValuesList(valueBufferList); req.setBitmapList(bitmapBufferList); req.setDataTypeList(sortedDataTypeList); - req.setAttributesList(sortedAttributesList); + req.setTagsList(sortedTagsList); try { Status status; @@ -461,7 +461,7 @@ public void insertColumnRecords(List paths, long[] timestamps, Object[] } public void insertRowRecords(List paths, long[] timestamps, Object[] valuesList, - List dataTypeList, List> attributesList) throws SessionException, ExecutionException { + List dataTypeList, List> tagsList) throws SessionException, ExecutionException { if (paths.isEmpty() || timestamps.length == 0 || valuesList.length == 0 || dataTypeList.isEmpty()) { logger.error("Invalid insert request!"); return; @@ -474,7 +474,7 @@ public void insertRowRecords(List paths, long[] timestamps, Object[] val logger.error("The sizes of timestamps and valuesList should be equal."); return; } - if (attributesList != null && paths.size() != attributesList.size()) { + if (tagsList != null && paths.size() != tagsList.size()) { logger.error("The sizes of paths, valuesList, dataTypeList and attributesList should be equal."); return; } @@ -497,7 +497,7 @@ public void insertRowRecords(List paths, long[] timestamps, Object[] val Arrays.sort(index, Comparator.comparing(paths::get)); Collections.sort(paths); List sortedDataTypeList = new ArrayList<>(); - List> sortedAttributesList = new ArrayList<>(); + List> sortedTagsList = new ArrayList<>(); for (int i = 0; i < sortedValuesList.length; i++) { Object[] values = new Object[index.length]; for (int j = 0; j < index.length; j++) { @@ -508,9 +508,9 @@ public void insertRowRecords(List paths, long[] timestamps, Object[] val for (Integer i : index) { sortedDataTypeList.add(dataTypeList.get(i)); } - if (attributesList != null) { + if (tagsList != null) { for (Integer i : index) { - sortedAttributesList.add(attributesList.get(i)); + sortedTagsList.add(tagsList.get(i)); } } @@ -539,7 +539,7 @@ public void insertRowRecords(List paths, long[] timestamps, Object[] val req.setValuesList(valueBufferList); req.setBitmapList(bitmapBufferList); req.setDataTypeList(sortedDataTypeList); - req.setAttributesList(sortedAttributesList); + req.setTagsList(sortedTagsList); try { Status status; @@ -585,12 +585,31 @@ public void deleteDataInColumns(List paths, long startTime, long endTime public SessionQueryDataSet queryData(List paths, long startTime, long endTime) throws SessionException, ExecutionException { + return queryData(paths, null, startTime, endTime); + } + + + public SessionQueryDataSet queryData(List paths, List> tagsList, long startTime, long endTime) + throws SessionException, ExecutionException { if (paths.isEmpty() || startTime > endTime) { logger.error("Invalid query request!"); return null; } + + Integer[] index = new Integer[paths.size()]; + for (int i = 0; i < paths.size(); i++) { + index[i] = i; + } + Arrays.sort(index, Comparator.comparing(paths::get)); Collections.sort(paths); + List> sortedTagsList = new ArrayList<>(); + if (tagsList != null) { + for (Integer i : index) { + sortedTagsList.add(tagsList.get(i)); + } + } QueryDataReq req = new QueryDataReq(sessionId, paths, startTime, endTime); + req.setTagsList(sortedTagsList); QueryDataResp resp; diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/session/SessionQueryDataSet.java b/core/src/main/java/cn/edu/tsinghua/iginx/session/SessionQueryDataSet.java index 1484b8905..2b706f400 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/session/SessionQueryDataSet.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/session/SessionQueryDataSet.java @@ -18,6 +18,7 @@ */ package cn.edu.tsinghua.iginx.session; +import cn.edu.tsinghua.iginx.metadata.TagTools; import cn.edu.tsinghua.iginx.thrift.DataType; import cn.edu.tsinghua.iginx.thrift.DownsampleQueryResp; import cn.edu.tsinghua.iginx.thrift.QueryDataResp; @@ -27,6 +28,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Map; import static cn.edu.tsinghua.iginx.utils.ByteUtils.getLongArrayFromByteBuffer; import static cn.edu.tsinghua.iginx.utils.ByteUtils.getValueFromByteBufferByDataType; @@ -36,10 +38,12 @@ public class SessionQueryDataSet { private final long[] timestamps; private List paths; private List> values; + private List> tagsList; public SessionQueryDataSet(QueryDataResp resp) { this.paths = resp.getPaths(); this.timestamps = getLongArrayFromByteBuffer(resp.queryDataSet.timestamps); + this.tagsList = resp.getTagsList(); parseValues(resp.dataTypeList, resp.queryDataSet.valuesList, resp.queryDataSet.bitmapList); } @@ -97,8 +101,13 @@ public List> getValues() { public void print() { System.out.println("Start to Print ResultSets:"); System.out.print("Time\t"); - for (String path : paths) { - System.out.print(path + "\t"); + for (int i = 0; i < paths.size(); i++) { + String path = paths.get(i); + if (tagsList != null && !tagsList.isEmpty()) { + System.out.print(TagTools.toString(path, tagsList.get(i)) + "\t"); + } else { + System.out.print(path + "\t"); + } } System.out.println(); @@ -115,4 +124,5 @@ public void print() { } System.out.println("Printing ResultSets Finished."); } + } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/split/SimplePlanGenerator.java b/core/src/main/java/cn/edu/tsinghua/iginx/split/SimplePlanGenerator.java index a3c18e72c..8000cce11 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/split/SimplePlanGenerator.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/split/SimplePlanGenerator.java @@ -28,7 +28,6 @@ import cn.edu.tsinghua.iginx.core.context.InsertRowRecordsContext; import cn.edu.tsinghua.iginx.core.context.QueryDataContext; import cn.edu.tsinghua.iginx.core.context.RequestContext; -import cn.edu.tsinghua.iginx.core.context.ShowColumnsContext; import cn.edu.tsinghua.iginx.core.context.ValueFilterQueryContext; import cn.edu.tsinghua.iginx.plan.AddColumnsPlan; import cn.edu.tsinghua.iginx.plan.AvgQueryPlan; @@ -111,7 +110,7 @@ public List generateSubPlans(RequestContext requestContext) getColumnValuesByDataType(insertColumnRecordsReq.getValuesList(), insertColumnRecordsReq.getDataTypeList(), insertColumnRecordsReq.getBitmapList(), timestamps.length), insertColumnRecordsReq.getBitmapList().stream().map(x -> new Bitmap(timestamps.length, x.array())).collect(Collectors.toList()), insertColumnRecordsReq.getDataTypeList(), - insertColumnRecordsReq.getAttributesList() + insertColumnRecordsReq.getTagsList() ); splitInfoList = planSplitter.getSplitInsertColumnRecordsPlanResults(insertColumnRecordsPlan); return splitInsertColumnRecordsPlan(insertColumnRecordsPlan, splitInfoList); @@ -123,7 +122,7 @@ public List generateSubPlans(RequestContext requestContext) getRowValuesByDataType(insertRowRecordsReq.getValuesList(), insertRowRecordsReq.getDataTypeList(), insertRowRecordsReq.getBitmapList()), insertRowRecordsReq.getBitmapList().stream().map(x -> new Bitmap(insertRowRecordsReq.getPathsSize(), x.array())).collect(Collectors.toList()), insertRowRecordsReq.getDataTypeList(), - insertRowRecordsReq.getAttributesList() + insertRowRecordsReq.getTagsList() ); splitInfoList = planSplitter.getSplitInsertRowRecordsPlanResults(insertRowRecordsPlan); return splitInsertRowRecordsPlan(insertRowRecordsPlan, splitInfoList); @@ -140,6 +139,7 @@ public List generateSubPlans(RequestContext requestContext) QueryDataReq queryDataReq = ((QueryDataContext) requestContext).getReq(); QueryDataPlan queryDataPlan = new QueryDataPlan( queryDataReq.getPaths(), + queryDataReq.getTagsList(), queryDataReq.getStartTime(), queryDataReq.getEndTime() ); @@ -330,7 +330,7 @@ public List splitInsertColumnRecordsPlan(InsertColumnRe valuesAndBitmaps.k, valuesAndBitmaps.v, plan.getDataTypeListByInterval(info.getTimeSeriesInterval()), - plan.getAttributesByInterval(info.getTimeSeriesInterval()), + null, info.getStorageUnit() ); subPlan.setSync(info.getStorageUnit().isMaster()); @@ -350,7 +350,7 @@ public List splitInsertRowRecordsPlan(InsertRowRecordsPlan valuesAndBitmaps.k, valuesAndBitmaps.v, plan.getDataTypeListByInterval(info.getTimeSeriesInterval()), - plan.getAttributesByInterval(info.getTimeSeriesInterval()), + null, info.getStorageUnit() ); subPlan.setSync(info.getStorageUnit().isMaster()); @@ -378,6 +378,7 @@ public List splitQueryDataPlan(QueryDataPlan plan, List measurements = new ArrayList<>(); + + private static final List> tagsList = new ArrayList<>(); + + private static final long COLUMN_START_TIMESTAMP = 0L; + + private static final long COLUMN_END_TIMESTAMP = 100L; + + private static final long ROW_START_TIMESTAMP = 200L; + + private static final long ROW_END_TIMESTAMP = 300L; + + static { + measurements.add("usage.memory"); + measurements.add("usage.cpu"); + measurements.add("usage.disk"); + + Map tags1 = new HashMap<>(); + tags1.put("host", "1"); + tags1.put("center", "beijing"); + tagsList.add(tags1); + + Map tags2 = new HashMap<>(); + tags2.put("host", "2"); + tags2.put("center", "beijing"); + tagsList.add(tags2); + + Map tags3 = new HashMap<>(); + tags3.put("host", "1"); + tags3.put("center", "shanghai"); + tagsList.add(tags3); + } + + private static List>> generatePathAndTagsList() { + List>> pathAndTagsList = new ArrayList<>(); +// for (String measurement : measurements) { +// for (Map stringStringMap : tagsList) { +// pathAndTagsList.add(new Pair<>(measurement, new HashMap<>(stringStringMap))); +// } +// } + for (int i = 0; i < measurements.size(); i++) { + pathAndTagsList.add(new Pair<>(measurements.get(i), tagsList.get(i))); + } + return pathAndTagsList; + } + + private static Session session; + + public static void main(String[] args) throws Exception { + session = new Session("127.0.0.1", 6888, "root", "root"); + + session.openSession(); + + insertRowRecords(); + + insertColumnRecords(); + + queryData(); + + session.closeSession(); + } + + private static void insertColumnRecords() throws Exception { + List>> pathAndTagsList = generatePathAndTagsList(); + + long[] timestamps = new long[(int)(COLUMN_END_TIMESTAMP - COLUMN_START_TIMESTAMP)]; + for (long i = COLUMN_START_TIMESTAMP; i < COLUMN_END_TIMESTAMP; i++) { + timestamps[(int)(i - COLUMN_START_TIMESTAMP)] = i; + } + + Object[] valuesList = new Object[pathAndTagsList.size()]; + for (int i = 0; i < valuesList.length; i++) { + Object[] values = new Object[timestamps.length]; + for (int j = 0; j < values.length; j++) { + values[j] = (long)(i + j + COLUMN_START_TIMESTAMP); + } + valuesList[i] = values; + } + + List dataTypeList = new ArrayList<>(); + for (int i = 0; i < pathAndTagsList.size(); i++) { + dataTypeList.add(DataType.LONG); + } + + session.insertColumnRecords(pathAndTagsList.stream().map(e -> e.k).collect(Collectors.toList()), timestamps, valuesList, dataTypeList, + pathAndTagsList.stream().map(e -> e.v).collect(Collectors.toList())); + } + + private static void insertRowRecords() throws Exception { + List>> pathAndTagsList = generatePathAndTagsList(); + + long[] timestamps = new long[(int)(ROW_END_TIMESTAMP - ROW_START_TIMESTAMP)]; + for (long i = ROW_START_TIMESTAMP; i < ROW_END_TIMESTAMP; i++) { + timestamps[(int)(i - ROW_START_TIMESTAMP)] = i; + } + + Object[] valuesList = new Object[timestamps.length]; + for (int i = 0; i < valuesList.length; i++) { + Object[] values = new Object[pathAndTagsList.size()]; + for (int j = 0; j < values.length; j++) { + values[j] = (long)(i + j + ROW_START_TIMESTAMP); + } + valuesList[i] = values; + } + + List dataTypeList = new ArrayList<>(); + for (int i = 0; i < pathAndTagsList.size(); i++) { + dataTypeList.add(DataType.LONG); + } + + session.insertRowRecords(pathAndTagsList.stream().map(e -> e.k).collect(Collectors.toList()), timestamps, valuesList, dataTypeList, + pathAndTagsList.stream().map(e -> e.v).collect(Collectors.toList())); + } + + private static void queryData() throws Exception { + List>> pathAndTagsList = generatePathAndTagsList(); + long startTime = COLUMN_START_TIMESTAMP + ((COLUMN_END_TIMESTAMP - COLUMN_START_TIMESTAMP) >> 1); + long endTime = ROW_END_TIMESTAMP - ((ROW_END_TIMESTAMP - ROW_START_TIMESTAMP) >> 1); + + System.out.println("query range from " + startTime + " to " + endTime); + + SessionQueryDataSet dataSet = session.queryData(pathAndTagsList.stream().map(e -> e.k).collect(Collectors.toList()), pathAndTagsList.stream().map(e -> e.v).collect(Collectors.toList()), + startTime, endTime); + dataSet.print(); + } + + +} diff --git a/thrift/src/main/proto/rpc.thrift b/thrift/src/main/proto/rpc.thrift index 8369ee764..71a793439 100644 --- a/thrift/src/main/proto/rpc.thrift +++ b/thrift/src/main/proto/rpc.thrift @@ -72,7 +72,7 @@ struct InsertColumnRecordsReq { 4: required list valuesList 5: required list bitmapList 6: required list dataTypeList - 7: optional list> attributesList + 7: optional list> tagsList } struct InsertRowRecordsReq { @@ -82,7 +82,7 @@ struct InsertRowRecordsReq { 4: required list valuesList 5: required list bitmapList 6: required list dataTypeList - 7: optional list> attributesList + 7: optional list> tagsList } struct DeleteDataInColumnsReq { @@ -103,6 +103,7 @@ struct QueryDataReq { 2: required list paths 3: required i64 startTime 4: required i64 endTime + 5: optional list> tagsList } struct QueryDataResp { @@ -110,6 +111,7 @@ struct QueryDataResp { 2: optional list paths 3: optional list dataTypeList 4: optional QueryDataSet queryDataSet + 5: optional list> tagsList } struct AddStorageEngineReq {