Skip to content

Commit

Permalink
Merge branch 'master' into 20240726_fix_memory
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Jul 30, 2024
2 parents 66b115c + 21a3db6 commit dc70771
Show file tree
Hide file tree
Showing 27 changed files with 878 additions and 42 deletions.
3 changes: 2 additions & 1 deletion be/src/vec/exec/format/json/new_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1215,7 +1215,7 @@ Status NewJsonReader::_simdjson_handle_flat_array_complex_json_write_columns(
while (true) {
cur = (*_array_iter).get_object();
// extract root
if (!_parsed_json_root.empty()) {
if (!_parsed_from_json_root && !_parsed_json_root.empty()) {
simdjson::ondemand::value val;
Status st = JsonFunctions::extract_from_object(cur, _parsed_json_root, &val);
if (UNLIKELY(!st.ok())) {
Expand Down Expand Up @@ -1611,6 +1611,7 @@ Status NewJsonReader::_get_json_value(size_t* size, bool* eof, simdjson::error_c
fmt::format_to(error_msg, "{}", st.to_string());
return return_quality_error(error_msg, std::string((char*)_json_str, *size));
}
_parsed_from_json_root = true;
} catch (simdjson::simdjson_error& e) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "Encounter error while extract_from_object, error: {}",
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/json/new_json_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ class NewJsonReader : public GenericReader {

std::vector<std::vector<JsonPath>> _parsed_jsonpaths;
std::vector<JsonPath> _parsed_json_root;
bool _parsed_from_json_root = false; // to avoid parsing json root multiple times

char _value_buffer[4 * 1024 * 1024]; // 4MB
char _parse_buffer[512 * 1024]; // 512KB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2267,6 +2267,12 @@ public class Config extends ConfigBase {
@ConfField
public static long stats_cache_size = 50_0000;

/**
* This config used for ranger cache data mask/row policy
*/
@ConfField
public static long ranger_cache_size = 10000;

/**
* This configuration is used to enable the statistics of query information, which will record
* the access status of databases, tables, and columns, and can be used to guide the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,12 @@ public void analyze(Analyzer analyzer) throws UserException {
}

private void checkLabelName(Analyzer analyzer) throws AnalysisException {
dbFullName = analyzer.getContext().getDatabase();
dbFullName = labelName == null ? null : labelName.getDbName();
if (Strings.isNullOrEmpty(dbFullName)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
dbFullName = analyzer.getContext().getDatabase();
if (Strings.isNullOrEmpty(dbFullName)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
}
}
name = labelName == null ? null : labelName.getLabelName();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog.authorizer.ranger.cache;

import org.apache.doris.analysis.ResourceTypeEnum;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.common.AuthorizationException;
import org.apache.doris.mysql.privilege.CatalogAccessController;
import org.apache.doris.mysql.privilege.DataMaskPolicy;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.mysql.privilege.RowFilterPolicy;

import java.util.List;
import java.util.Optional;
import java.util.Set;

public abstract class CatalogCacheAccessController implements CatalogAccessController {
public abstract CatalogAccessController getProxyController();

public abstract RangerCache getCache();


@Override
public boolean checkGlobalPriv(UserIdentity currentUser, PrivPredicate wanted) {
return getProxyController().checkGlobalPriv(currentUser, wanted);
}

@Override
public boolean checkCtlPriv(UserIdentity currentUser, String ctl, PrivPredicate wanted) {
return getProxyController().checkCtlPriv(currentUser, ctl, wanted);
}

@Override
public boolean checkDbPriv(UserIdentity currentUser, String ctl, String db, PrivPredicate wanted) {
return getProxyController().checkDbPriv(currentUser, ctl, db, wanted);
}

@Override
public boolean checkTblPriv(UserIdentity currentUser, String ctl, String db, String tbl, PrivPredicate wanted) {
return getProxyController().checkTblPriv(currentUser, ctl, db, tbl, wanted);
}

@Override
public boolean checkResourcePriv(UserIdentity currentUser, String resourceName, PrivPredicate wanted) {
return getProxyController().checkResourcePriv(currentUser, resourceName, wanted);
}

@Override
public boolean checkWorkloadGroupPriv(UserIdentity currentUser, String workloadGroupName, PrivPredicate wanted) {
return getProxyController().checkWorkloadGroupPriv(currentUser, workloadGroupName, wanted);
}

@Override
public void checkColsPriv(UserIdentity currentUser, String ctl, String db, String tbl, Set<String> cols,
PrivPredicate wanted) throws AuthorizationException {
getProxyController().checkColsPriv(currentUser, ctl, db, tbl, cols, wanted);
}

@Override
public boolean checkCloudPriv(UserIdentity currentUser, String resourceName, PrivPredicate wanted,
ResourceTypeEnum type) {
return getProxyController().checkCloudPriv(currentUser, resourceName, wanted, type);
}

@Override
public Optional<DataMaskPolicy> evalDataMaskPolicy(UserIdentity currentUser, String ctl, String db, String tbl,
String col) {
return getCache().getDataMask(new DatamaskCacheKey(currentUser, ctl, db, tbl, col));
}

@Override
public List<? extends RowFilterPolicy> evalRowFilterPolicies(UserIdentity currentUser, String ctl, String db,
String tbl) {
return getCache().getRowFilters(new RowFilterCacheKey(currentUser, ctl, db, tbl));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog.authorizer.ranger.cache;

import org.apache.doris.analysis.UserIdentity;

import com.google.common.base.Objects;

public class DatamaskCacheKey {
private UserIdentity userIdentity;
private String ctl;
private String db;
private String tbl;
private String col;

public DatamaskCacheKey(UserIdentity userIdentity, String ctl, String db, String tbl, String col) {
this.userIdentity = userIdentity;
this.ctl = ctl;
this.db = db;
this.tbl = tbl;
this.col = col;
}

public UserIdentity getUserIdentity() {
return userIdentity;
}

public String getCtl() {
return ctl;
}

public String getDb() {
return db;
}

public String getTbl() {
return tbl;
}

public String getCol() {
return col;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DatamaskCacheKey that = (DatamaskCacheKey) o;
return Objects.equal(userIdentity, that.userIdentity)
&& Objects.equal(ctl, that.ctl) && Objects.equal(db, that.db)
&& Objects.equal(tbl, that.tbl) && Objects.equal(col,
that.col);
}

@Override
public int hashCode() {
return Objects.hashCode(userIdentity, ctl, db, tbl, col);
}

@Override
public String toString() {
return "DatamaskCacheKey{"
+ "userIdentity=" + userIdentity
+ ", ctl='" + ctl + '\''
+ ", db='" + db + '\''
+ ", tbl='" + tbl + '\''
+ ", col='" + col + '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog.authorizer.ranger.cache;

import org.apache.doris.common.Config;
import org.apache.doris.datasource.CacheException;
import org.apache.doris.mysql.privilege.CatalogAccessController;
import org.apache.doris.mysql.privilege.DataMaskPolicy;
import org.apache.doris.mysql.privilege.RowFilterPolicy;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

public class RangerCache {
private static final Logger LOG = LoggerFactory.getLogger(RangerCache.class);

private CatalogAccessController controller;
private LoadingCache<DatamaskCacheKey, Optional<DataMaskPolicy>> datamaskCache = CacheBuilder.newBuilder()
.maximumSize(Config.ranger_cache_size)
.build(new CacheLoader<DatamaskCacheKey, Optional<DataMaskPolicy>>() {
@Override
public Optional<DataMaskPolicy> load(DatamaskCacheKey key) {
return loadDataMask(key);
}
});

private LoadingCache<RowFilterCacheKey, List<? extends RowFilterPolicy>> rowFilterCache = CacheBuilder.newBuilder()
.maximumSize(Config.ranger_cache_size)
.build(new CacheLoader<RowFilterCacheKey, List<? extends RowFilterPolicy>>() {
@Override
public List<? extends RowFilterPolicy> load(RowFilterCacheKey key) {
return loadRowFilter(key);
}
});

public RangerCache() {
}

public void init(CatalogAccessController controller) {
this.controller = controller;
}

private Optional<DataMaskPolicy> loadDataMask(DatamaskCacheKey key) {
Objects.requireNonNull(controller, "controller can not be null");
if (LOG.isDebugEnabled()) {
LOG.debug("load datamask: {}", key);
}
return controller.evalDataMaskPolicy(key.getUserIdentity(), key.getCtl(), key.getDb(), key.getTbl(),
key.getCol());
}

private List<? extends RowFilterPolicy> loadRowFilter(RowFilterCacheKey key) {
Objects.requireNonNull(controller, "controller can not be null");
if (LOG.isDebugEnabled()) {
LOG.debug("load row filter: {}", key);
}
return controller.evalRowFilterPolicies(key.getUserIdentity(), key.getCtl(), key.getDb(), key.getTbl());
}

public void invalidateDataMaskCache() {
datamaskCache.invalidateAll();
}

public void invalidateRowFilterCache() {
rowFilterCache.invalidateAll();
}

public Optional<DataMaskPolicy> getDataMask(DatamaskCacheKey key) {
try {
return datamaskCache.get(key);
} catch (ExecutionException e) {
throw new CacheException("failed to get datamask for:" + key, e);
}
}

public List<? extends RowFilterPolicy> getRowFilters(RowFilterCacheKey key) {
try {
return rowFilterCache.get(key);
} catch (ExecutionException e) {
throw new CacheException("failed to get row filter for:" + key, e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog.authorizer.ranger.cache;

import org.apache.doris.catalog.authorizer.ranger.doris.RangerDorisAccessController;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.ranger.plugin.service.RangerAuthContextListener;

public class RangerCacheInvalidateListener implements RangerAuthContextListener {
private static final Logger LOG = LogManager.getLogger(RangerDorisAccessController.class);

private RangerCache cache;

public RangerCacheInvalidateListener(RangerCache cache) {
this.cache = cache;
}

@Override
public void contextChanged() {
LOG.info("ranger context changed");
cache.invalidateDataMaskCache();
cache.invalidateRowFilterCache();
}
}
Loading

0 comments on commit dc70771

Please sign in to comment.