Skip to content

Elasticsearch Document Update 과정

zerozae edited this page Jan 18, 2024 · 1 revision

Elasticsearch에서 Document를 Update하는 과정을 다뤄보려고 한다. Elasticsearch의 경우 update시 일반적인 update와는 다르게 기존에 존재하는 문서를 제거하고, 새로운 문서를 생성한다고 한다. 따라서 검색성능은 좋지만 수정과 관련된 기능에 대해서는 성능이 별로 좋지 못할 것 같다.라는 생각이 들었고 어떻게 해야할 지 고민하게 되었다.

우선 현재 프로젝트 구조에서 JPA의 Entity와 Elasticsearch의 Document는 별도의 클래스로 분리되어 있으며, Entity를 JPA Repository에 저장시 Elasticsearch의 Document 역시 함께 저장해주는 방식을 채택하고 있기 때문에 수정시에도 Entity가 수정됨에 따라 Elasticsearch의 Document 역시 수정이 되어야 키워드 검색을 진행하게 되었을 때 데이터 정합성을 보장할 수 있을 것이라고 생각했다.

그런데, Entity 하나하나 수정할 때마다 Elasticsearch의 Document 역시 수정하기에는 성능상 큰 문제가 될 것 같다고 판단하여 다른 방안을 모색하던 중 "NHN 엘라스틱서치를 이용한 상품 검색 엔진 개발 일지"라는 한 영상을 보고 아이디어를 얻게 되었다.

1. 적용기

전반적인 흐름에 대해 살펴보면, JPA Entity가 수정됨에 따라 대응되는 Elasticsearch의 Document를 별도로 구현한 Buffer라는 자료구조 클래스에 저장하여 모아놓고 30초 간격으로 스케줄러를 통해 Bulk Update를 진행하는 방식으로 구현하고자 했다.

[ DocumentBuffer ]

public class DocumentBuffer {

    private static final Queue<EventDocument> eventDocuments = new ConcurrentLinkedQueue<>();

    public static void add(EventDocument eventDocument) {
       eventDocuments.add(eventDocument);
    }

    public static List<Object> getAll() {
       return Arrays.asList(eventDocuments.toArray());
    }

    public static void deleteAll() {
       eventDocuments.clear();
    }
}

먼저, DocumentBuffer라는 Document들을 모아놓기 위한 자료구조 클래스를 구현하였다. 기본적인 add, remove 메소드 등이 있다.

[ EventService - updateEvent ]

public EventResponse updateEvent(Long id, EventUpdateRequest request) {
    Event event = getEvent(id);
    EventEdit eventEdit = getEventEdit(request);
    event.updateEvent(eventEdit);

    DocumentBuffer.add(EventDocument.from(event));
    return EventResponse.of(event);
}

event 라는 JPA Entity를 업데이트함에 따라, 대응되는 Document를 DocumentBuffer 내부에 저장시킨다.

@Slf4j
@Component
@RequiredArgsConstructor
public class EventDocumentScheduler {

    private static final int SCHEDULE_UPDATE_CYCLE = 30000;

    private final EventSearchQueryRepository eventSearchQueryRepository;

    @Scheduled(fixedDelay = SCHEDULE_UPDATE_CYCLE)
    public void scheduleUpdateEventDocument() {
       log.info(">>>>> execute scheduleUpdateEventDocument");
       try {
          List<Object> documents = DocumentBuffer.getAll();
          isDocumentsEmpty(documents);
          eventSearchQueryRepository.bulkUpdate(documents);
       } catch (RuntimeException e) {
          log.warn(">>>>> scheduleUpdateEventDocument: Document Empty");
          return;
       } catch (Exception e) {
          log.warn(">>>>> scheduleUpdateEventDocument: failed!!! {}", e);
          return;
       }
       DocumentBuffer.deleteAll();
    }

    private static void isDocumentsEmpty(List<Object> documents) {
       if (documents.isEmpty())
          throw new IllegalArgumentException("scheduleUpdateEventDocument: Document Emtpy");
    }
}

스케줄러를 통해 30초 간격으로 DocumentBuffer에 Document 정보가 들어있는지를 확인하고, Document가 존재하면 해당 Document를 Bulk Update하는 메소드를 호출한다.

[ EventQuerySearchRepository - bulkUpdate ]

public <T> void bulkUpdate(List<T> documents) {
    try {
       List<UpdateQuery> updateQueries = new ArrayList<>();
       for (T document : documents) {
          Document esDocument = elasticsearchOperations.getElasticsearchConverter().mapObject(document);
          UpdateQuery updateQuery = UpdateQuery.builder(esDocument.getId())
             .withDocument(esDocument)
             .withDocAsUpsert(true)
             .build();
          updateQueries.add(updateQuery);
       }
       elasticsearchOperations.bulkUpdate(updateQueries, IndexCoordinates.of("event"));
    } catch (Exception e) {
       throw e;
    }
}

Bulk Update 를 통해 한 번에 Document 들을 update 한다.

2. 결과 확인해보기

[ 업데이트 전 ]

!https://blog.kakaocdn.net/dn/G5o2A/btsDyOuFAwp/FATJIGoy3CkeG8bMxe6kZK/img.png

먼저 업데이트 이전에 스케줄러가 동작하는 로그를 확인해보자. 아직 Entity에 대한 수정이 이루어지지 않았기 때문에 Update 하기 위한 Document가 존재하지 않아 DocumentBuffer 내부가 비어있다는 로그를 확인할 수 있다.

[ 업데이트 후 ]

!https://blog.kakaocdn.net/dn/2k7Q5/btsDyB3sYtN/B6BSQPZipRdywr4eDWoDN1/img.png

이후 업데이트를 진행하게 되면, DocumentBuffer에 모여있던 Document 들이 Bulk 연산을 통해 Update 쿼리가 Elasticsearch로 전달되어 수정이 되는 것을 확인할 수 있다.

3. 생각해볼 점

마지막으로 개선할 부분이나, 생각해볼만한 부분으로는 벌크 연산으로 업데이트 과정 중 서버가 다운되거나 엘라스틱서치에 문제가 발생하여 중간에 업데이트가 끊기게 되었을 때, 서버가 복구되거나, 엘라스틱서치가 정상 동작할 때 업데이트가 되지 못하고 끊긴 데이터들의 재업데이트가 보장되느냐를 생각해볼 수 있을 것 같다.

또한 서비스 규모에 따라 벌크 연산의 주기를 조절할 필요가 있을 것 같다. 데이터가 정말 많고 사용자가 많은 서비스 같은 경우는 데이터의 정합성이 보다 중요하기 때문에, 주기를 좀 더 짧게 조절할 필요가 있을 것이고 현재 프로젝트처럼 규모가 그렇게 크지 않다면 좀 더 넉넉하게 스케줄러를 돌려도 괜찮을 것 같다는 생각이 들었다.