Skip to content

Commit

Permalink
[SYNCOPE-1776] Using ES BulkRequest for re-index (#515)
Browse files Browse the repository at this point in the history
  • Loading branch information
ilgrosso authored Sep 13, 2023
1 parent c6b2216 commit ca2f76e
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public void any(final AnyLifecycleEvent<Any<?>> event) throws IOException {
IndexRequest<Map<String, Object>> request = new IndexRequest.Builder<Map<String, Object>>().
index(ElasticsearchUtils.getAnyIndex(event.getDomain(), event.getAny().getType().getKind())).
id(event.getAny().getKey()).
document(elasticsearchUtils.document(event.getAny(), event.getDomain())).
document(elasticsearchUtils.document(event.getAny())).
build();
IndexResponse response = client.index(request);
LOG.debug("Index successfully created or updated for {}: {}", event.getAny(), response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.syncope.core.persistence.api.entity.anyobject.AnyObject;
import org.apache.syncope.core.persistence.api.entity.group.Group;
import org.apache.syncope.core.persistence.api.entity.user.User;
import org.apache.syncope.core.spring.security.AuthContextUtils;
import org.springframework.transaction.annotation.Transactional;

/**
Expand Down Expand Up @@ -77,27 +76,20 @@ public ElasticsearchUtils(
* Returns the document specialized with content from the provided any.
*
* @param any user, group or any object to index
* @param domain tenant information
* @return document specialized with content from the provided any
* @throws IOException in case of errors
*/
@Transactional
public Map<String, Object> document(final Any<?> any, final String domain) throws IOException {
Set<String> resources = new HashSet<>();
List<String> dynRealms = new ArrayList<>();
AuthContextUtils.callAsAdmin(domain, () -> {
resources.addAll(any instanceof User
? userDAO.findAllResourceKeys(any.getKey())
: any instanceof AnyObject
? anyObjectDAO.findAllResourceKeys(any.getKey())
: groupDAO.findAllResourceKeys(any.getKey()));
dynRealms.addAll(any instanceof User
? userDAO.findDynRealms(any.getKey())
: any instanceof AnyObject
? anyObjectDAO.findDynRealms(any.getKey())
: groupDAO.findDynRealms(any.getKey()));
return null;
});
public Map<String, Object> document(final Any<?> any) {
Collection<String> resources = any instanceof User
? userDAO.findAllResourceKeys(any.getKey())
: any instanceof AnyObject
? anyObjectDAO.findAllResourceKeys(any.getKey())
: groupDAO.findAllResourceKeys(any.getKey());
Collection<String> dynRealms = any instanceof User
? userDAO.findDynRealms(any.getKey())
: any instanceof AnyObject
? anyObjectDAO.findDynRealms(any.getKey())
: groupDAO.findDynRealms(any.getKey());

Map<String, Object> builder = new HashMap<>();
builder.put("id", any.getKey());
Expand All @@ -118,42 +110,34 @@ public Map<String, Object> document(final Any<?> any, final String domain) throw
AnyObject anyObject = ((AnyObject) any);
builder.put("name", anyObject.getName());

Collection<String> memberships = AuthContextUtils.callAsAdmin(
domain, () -> anyObjectDAO.findAllGroupKeys(anyObject));
builder.put("memberships", memberships);
builder.put("memberships", anyObjectDAO.findAllGroupKeys(anyObject));

List<String> relationships = new ArrayList<>();
List<String> relationshipTypes = new ArrayList<>();
AuthContextUtils.callAsAdmin(domain, () -> {
anyObjectDAO.findAllRelationships(anyObject).forEach(relationship -> {
relationships.add(relationship.getRightEnd().getKey());
relationshipTypes.add(relationship.getType().getKey());
});
return null;
anyObjectDAO.findAllRelationships(anyObject).forEach(relationship -> {
relationships.add(relationship.getRightEnd().getKey());
relationshipTypes.add(relationship.getType().getKey());
});
builder.put("relationships", relationships);
builder.put("relationshipTypes", relationshipTypes);

ElasticsearchUtils.this.customizeDocument(builder, anyObject, domain);
ElasticsearchUtils.this.customizeDocument(builder, anyObject);
} else if (any instanceof Group) {
Group group = ((Group) any);
builder.put("name", group.getName());
Optional.ofNullable(group.getUserOwner()).ifPresent(uo -> builder.put("userOwner", uo.getKey()));
Optional.ofNullable(group.getGroupOwner()).ifPresent(go -> builder.put("groupOwner", go.getKey()));

Set<String> members = AuthContextUtils.callAsAdmin(domain, () -> {
Set<String> m = new HashSet<>();
m.addAll(groupDAO.findUMemberships(group).stream().
map(membership -> membership.getLeftEnd().getKey()).collect(Collectors.toList()));
m.addAll(groupDAO.findUDynMembers(group));
m.addAll(groupDAO.findAMemberships(group).stream().
map(membership -> membership.getLeftEnd().getKey()).collect(Collectors.toList()));
m.addAll(groupDAO.findADynMembers(group));
return m;
});
Set<String> members = new HashSet<>();
members.addAll(groupDAO.findUMemberships(group).stream().
map(membership -> membership.getLeftEnd().getKey()).collect(Collectors.toList()));
members.addAll(groupDAO.findUDynMembers(group));
members.addAll(groupDAO.findAMemberships(group).stream().
map(membership -> membership.getLeftEnd().getKey()).collect(Collectors.toList()));
members.addAll(groupDAO.findADynMembers(group));
builder.put("members", members);

ElasticsearchUtils.this.customizeDocument(builder, group, domain);
ElasticsearchUtils.this.customizeDocument(builder, group);
} else if (any instanceof User) {
User user = ((User) any);
builder.put("username", user.getUsername());
Expand All @@ -167,18 +151,14 @@ public Map<String, Object> document(final Any<?> any, final String domain) throw

List<String> roles = new ArrayList<>();
Set<String> privileges = new HashSet<>();
AuthContextUtils.callAsAdmin(domain, () -> {
userDAO.findAllRoles(user).forEach(role -> {
roles.add(role.getKey());
privileges.addAll(role.getPrivileges().stream().map(Privilege::getKey).collect(Collectors.toSet()));
});
return null;
userDAO.findAllRoles(user).forEach(role -> {
roles.add(role.getKey());
privileges.addAll(role.getPrivileges().stream().map(Privilege::getKey).collect(Collectors.toSet()));
});
builder.put("roles", roles);
builder.put("privileges", privileges);

Collection<String> memberships = AuthContextUtils.callAsAdmin(domain, () -> userDAO.findAllGroupKeys(user));
builder.put("memberships", memberships);
builder.put("memberships", userDAO.findAllGroupKeys(user));

List<String> relationships = new ArrayList<>();
Set<String> relationshipTypes = new HashSet<>();
Expand All @@ -189,7 +169,7 @@ public Map<String, Object> document(final Any<?> any, final String domain) throw
builder.put("relationships", relationships);
builder.put("relationshipTypes", relationshipTypes);

customizeDocument(builder, user, domain);
customizeDocument(builder, user);
}

for (PlainAttr<?> plainAttr : any.getPlainAttrs()) {
Expand All @@ -204,18 +184,13 @@ public Map<String, Object> document(final Any<?> any, final String domain) throw
return builder;
}

protected void customizeDocument(
final Map<String, Object> builder, final AnyObject anyObject, final String domain)
throws IOException {
protected void customizeDocument(final Map<String, Object> builder, final AnyObject anyObject) {
}

protected void customizeDocument(
final Map<String, Object> builder, final Group group, final String domain)
throws IOException {
protected void customizeDocument(final Map<String, Object> builder, final Group group) {
}

protected void customizeDocument(final Map<String, Object> builder, final User user, final String domain)
throws IOException {
protected void customizeDocument(final Map<String, Object> builder, final User user) {
}

public Map<String, Object> document(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.elasticsearch.core.IndexResponse;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.indices.IndexSettings;
import java.io.IOException;
import java.util.Map;
import org.apache.syncope.common.lib.types.AnyTypeKind;
import org.apache.syncope.core.persistence.api.dao.AnyDAO;
import org.apache.syncope.core.persistence.api.dao.AnyObjectDAO;
Expand Down Expand Up @@ -112,62 +111,75 @@ protected String doExecute(final boolean dryRun, final String executor, final Jo
AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT, anyObjectSettings(), anyObjectMapping());

int users = userDAO.count();
setStatus("Indexing " + users + " users...");
String uindex = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.USER);
setStatus("Indexing " + users + " users under " + uindex + "...");
for (int page = 1; page <= (users / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
BulkRequest.Builder bulkRequest = new BulkRequest.Builder();

for (String user : userDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) {
IndexRequest<Map<String, Object>> request = new IndexRequest.Builder<Map<String, Object>>().
index(ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.USER)).
bulkRequest.operations(op -> op.index(idx -> idx.
index(uindex).
id(user).
document(utils.document(userDAO.find(user), AuthContextUtils.getDomain())).
build();
try {
IndexResponse response = client.index(request);
LOG.debug("Index successfully created for {}: {}", user, response);
} catch (Exception e) {
LOG.error("Could not create index for {} {}", AnyTypeKind.USER, user, e);
}
document(utils.document(userDAO.find(user)))));
}

try {
BulkResponse response = client.bulk(bulkRequest.build());
LOG.debug("Index successfully created for {} [{}/{}]: {}",
uindex, page, AnyDAO.DEFAULT_PAGE_SIZE, response);
} catch (Exception e) {
LOG.error("Could not create index for {} [{}/{}]: {}",
uindex, page, AnyDAO.DEFAULT_PAGE_SIZE, e);
}
}

int groups = groupDAO.count();
setStatus("Indexing " + groups + " groups...");
String gindex = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.GROUP);
setStatus("Indexing " + groups + " groups under " + gindex + "...");
for (int page = 1; page <= (groups / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
BulkRequest.Builder bulkRequest = new BulkRequest.Builder();

for (String group : groupDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) {
IndexRequest<Map<String, Object>> request = new IndexRequest.Builder<Map<String, Object>>().
index(ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.GROUP)).
bulkRequest.operations(op -> op.index(idx -> idx.
index(gindex).
id(group).
document(utils.document(groupDAO.find(group), AuthContextUtils.getDomain())).
build();
try {
IndexResponse response = client.index(request);
LOG.debug("Index successfully created for {}: {}", group, response);
} catch (Exception e) {
LOG.error("Could not create index for {} {}", AnyTypeKind.GROUP, group, e);
}
document(utils.document(groupDAO.find(group)))));
}

try {
BulkResponse response = client.bulk(bulkRequest.build());
LOG.debug("Index successfully created for {} [{}/{}]: {}",
gindex, page, AnyDAO.DEFAULT_PAGE_SIZE, response);
} catch (Exception e) {
LOG.error("Could not create index for {} [{}/{}]: {}",
gindex, page, AnyDAO.DEFAULT_PAGE_SIZE, e);
}
}

int anyObjects = anyObjectDAO.count();
setStatus("Indexing " + anyObjects + " any objects...");
String aindex = ElasticsearchUtils.getAnyIndex(AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT);
setStatus("Indexing " + anyObjects + " any objects under " + aindex + "...");
for (int page = 1; page <= (anyObjects / AnyDAO.DEFAULT_PAGE_SIZE) + 1; page++) {
BulkRequest.Builder bulkRequest = new BulkRequest.Builder();

for (String anyObject : anyObjectDAO.findAllKeys(page, AnyDAO.DEFAULT_PAGE_SIZE)) {
IndexRequest<Map<String, Object>> request = new IndexRequest.Builder<Map<String, Object>>().
index(ElasticsearchUtils.getAnyIndex(
AuthContextUtils.getDomain(), AnyTypeKind.ANY_OBJECT)).
bulkRequest.operations(op -> op.index(idx -> idx.
index(aindex).
id(anyObject).
document(utils.document(anyObjectDAO.find(anyObject), AuthContextUtils.getDomain())).
build();
try {
IndexResponse response = client.index(request);
LOG.debug("Index successfully created for {}: {}", anyObject, response);
} catch (Exception e) {
LOG.error("Could not create index for {} {}", AnyTypeKind.ANY_OBJECT, anyObject, e);
}
document(utils.document(anyObjectDAO.find(anyObject)))));
}

try {
BulkResponse response = client.bulk(bulkRequest.build());
LOG.debug("Index successfully created for {} [{}/{}]: {}",
aindex, page, AnyDAO.DEFAULT_PAGE_SIZE, response);
} catch (Exception e) {
LOG.error("Could not create index for {} [{}/{}]: {}",
aindex, page, AnyDAO.DEFAULT_PAGE_SIZE, e);
}
}

indexManager.createAuditIndex(
AuthContextUtils.getDomain(), auditSettings(), auditMapping());
indexManager.createAuditIndex(AuthContextUtils.getDomain(), auditSettings(), auditMapping());

setStatus("Rebuild indexes for domain " + AuthContextUtils.getDomain() + " successfully completed");
} catch (Exception e) {
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ under the License.

<slf4j.version>1.7.36</slf4j.version>

<elasticsearch.version>8.9.2</elasticsearch.version>
<elasticsearch.version>8.10.0</elasticsearch.version>

<log4j2.version>2.20.0</log4j2.version>
<disruptor.version>3.4.4</disruptor.version>
Expand Down

0 comments on commit ca2f76e

Please sign in to comment.