From 887d518463bb6dad33e5a65c8b0c7b0e5d8379e3 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Fri, 20 Dec 2024 20:58:03 +0800 Subject: [PATCH] [improve][client] Make replicateSubscriptionState nullable (#23757) Signed-off-by: Zixuan Liu (cherry picked from commit 3fce3097c76a9c8cb64cf3d8d87f6e050e6cb3a5) Signed-off-by: Zixuan Liu --- .../pulsar/broker/service/ServerCnx.java | 4 +- .../broker/service/SubscriptionOption.java | 2 +- .../nonpersistent/NonPersistentTopic.java | 4 +- .../persistent/PersistentSubscription.java | 20 +++- .../service/persistent/PersistentTopic.java | 23 +++-- .../client/api/ReplicateSubscriptionTest.java | 96 +++++++++++++++++++ .../pulsar/client/impl/ConsumerImpl.java | 2 +- .../impl/conf/ConsumerConfigurationData.java | 20 +++- .../client/impl/ConsumerBuilderImplTest.java | 36 ++++++- .../pulsar/common/protocol/Commands.java | 8 +- 10 files changed, 186 insertions(+), 29 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index aafcb5eeb486d..b0926ee31d9f0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -984,8 +984,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { ? subscribe.getStartMessageRollbackDurationSec() : -1; final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null; - final boolean isReplicated = subscribe.hasReplicateSubscriptionState() - && subscribe.isReplicateSubscriptionState(); + final Boolean isReplicated = + subscribe.hasReplicateSubscriptionState() ? subscribe.isReplicateSubscriptionState() : null; final boolean forceTopicCreation = subscribe.isForceTopicCreation(); final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta() ? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta()) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java index d7503f6e90d4a..8d0e7ab32803c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java @@ -45,7 +45,7 @@ public class SubscriptionOption { private boolean readCompacted; private CommandSubscribe.InitialPosition initialPosition; private long startMessageRollbackDurationSec; - private boolean replicatedSubscriptionStateArg; + private Boolean replicatedSubscriptionStateArg; private KeySharedMeta keySharedMeta; private Optional> subscriptionProperties; private long consumerEpoch; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 62ccc3a259586..d63239782d203 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -246,7 +246,7 @@ public CompletableFuture subscribe(SubscriptionOption option) { option.getSubType(), option.getPriorityLevel(), option.getConsumerName(), option.isDurable(), option.getStartMessageId(), option.getMetadata(), option.isReadCompacted(), option.getInitialPosition(), - option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(), + option.getStartMessageRollbackDurationSec(), option.getReplicatedSubscriptionStateArg(), option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(null)); } @@ -269,7 +269,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St MessageId startMessageId, Map metadata, boolean readCompacted, InitialPosition initialPosition, long resetStartMessageBackInSec, - boolean replicateSubscriptionState, + Boolean replicateSubscriptionState, KeySharedMeta keySharedMeta, Map subscriptionProperties) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 937e9ba1fd1b9..f9a347aef9567 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -119,9 +119,11 @@ public class PersistentSubscription extends AbstractSubscription implements Subs static { REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); } + private volatile Boolean replicatedControlled; - static Map getBaseCursorProperties(boolean isReplicated) { - return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES; + static Map getBaseCursorProperties(Boolean isReplicated) { + return isReplicated != null && isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : + NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES; } static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) { @@ -129,19 +131,21 @@ static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) { } public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor, - boolean replicated) { + Boolean replicated) { this(topic, subscriptionName, cursor, replicated, Collections.emptyMap()); } public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor, - boolean replicated, Map subscriptionProperties) { + Boolean replicated, Map subscriptionProperties) { this.topic = topic; this.cursor = cursor; this.topicName = topic.getName(); this.subName = subscriptionName; this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString(); this.expiryMonitor = new PersistentMessageExpiryMonitor(topic, subscriptionName, cursor, this); - this.setReplicated(replicated); + if (replicated != null) { + this.setReplicated(replicated); + } this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties) ? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties); if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled() @@ -179,6 +183,7 @@ public boolean isReplicated() { } public boolean setReplicated(boolean replicated) { + replicatedControlled = replicated; ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig(); if (!replicated || !config.isEnableReplicatedSubscriptions()) { @@ -1230,4 +1235,9 @@ public boolean checkIfPendingAckStoreInit() { public PendingAckHandle getPendingAckHandle() { return pendingAckHandle; } + + @VisibleForTesting + public Boolean getReplicatedControlled() { + return replicatedControlled; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 14c77e77e02ea..37d32782d0401 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -396,7 +396,7 @@ private CompletableFuture removeOrphanReplicationCursors() { } private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor, - boolean replicated, Map subscriptionProperties) { + Boolean replicated, Map subscriptionProperties) { checkNotNull(compactedTopic); if (subscriptionName.equals(COMPACTION_SUBSCRIPTION)) { return new CompactorSubscription(this, compactedTopic, subscriptionName, cursor); @@ -685,7 +685,7 @@ public CompletableFuture subscribe(SubscriptionOption option) { option.getSubType(), option.getPriorityLevel(), option.getConsumerName(), option.isDurable(), option.getStartMessageId(), option.getMetadata(), option.isReadCompacted(), option.getInitialPosition(), option.getStartMessageRollbackDurationSec(), - option.isReplicatedSubscriptionStateArg(), option.getKeySharedMeta(), + option.getReplicatedSubscriptionStateArg(), option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(Collections.emptyMap()), option.getConsumerEpoch()); } @@ -696,7 +696,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St Map metadata, boolean readCompacted, InitialPosition initialPosition, long startMessageRollbackDurationSec, - boolean replicatedSubscriptionStateArg, + Boolean replicatedSubscriptionStateArg, KeySharedMeta keySharedMeta, Map subscriptionProperties, long consumerEpoch) { @@ -706,12 +706,9 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St } return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> { - boolean replicatedSubscriptionState = replicatedSubscriptionStateArg; - - if (replicatedSubscriptionState + if (replicatedSubscriptionStateArg != null && replicatedSubscriptionStateArg && !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) { log.warn("[{}] Replicated Subscription is disabled by broker.", getName()); - replicatedSubscriptionState = false; } if (subType == SubType.Key_Shared @@ -777,9 +774,9 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St lock.readLock().unlock(); } - CompletableFuture subscriptionFuture = isDurable ? // - getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec, - replicatedSubscriptionState, subscriptionProperties) + CompletableFuture subscriptionFuture = isDurable + ? getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec, + replicatedSubscriptionStateArg, subscriptionProperties) : getNonDurableSubscription(subscriptionName, startMessageId, initialPosition, startMessageRollbackDurationSec, readCompacted, subscriptionProperties); @@ -865,7 +862,9 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs } private CompletableFuture getDurableSubscription(String subscriptionName, - InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicated, + InitialPosition initialPosition, + long startMessageRollbackDurationSec, + Boolean replicated, Map subscriptionProperties) { CompletableFuture subscriptionFuture = new CompletableFuture<>(); if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) { @@ -897,7 +896,7 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) { return; } } - if (replicated && !subscription.isReplicated()) { + if (replicated != null && replicated && !subscription.isReplicated()) { // Flip the subscription state subscription.setReplicated(replicated); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java new file mode 100644 index 0000000000000..3175a33ced5b8 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.client.impl.ConsumerBuilderImpl; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class ReplicateSubscriptionTest extends ProducerConsumerBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + } + + @DataProvider + public Object[] replicateSubscriptionState() { + return new Object[]{ + Boolean.TRUE, + Boolean.FALSE, + null + }; + } + + @Test(dataProvider = "replicateSubscriptionState") + public void testReplicateSubscriptionState(Boolean replicateSubscriptionState) + throws Exception { + String topic = "persistent://my-property/my-ns/" + System.nanoTime(); + String subName = "sub-" + System.nanoTime(); + ConsumerBuilder consumerBuilder = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName(subName); + if (replicateSubscriptionState != null) { + consumerBuilder.replicateSubscriptionState(replicateSubscriptionState); + } + ConsumerBuilderImpl consumerBuilderImpl = (ConsumerBuilderImpl) consumerBuilder; + assertEquals(consumerBuilderImpl.getConf().getReplicateSubscriptionState(), replicateSubscriptionState); + @Cleanup + Consumer ignored = consumerBuilder.subscribe(); + CompletableFuture> topicIfExists = pulsar.getBrokerService().getTopicIfExists(topic); + assertThat(topicIfExists) + .succeedsWithin(3, TimeUnit.SECONDS) + .matches(optionalTopic -> { + assertTrue(optionalTopic.isPresent()); + Topic topicRef = optionalTopic.get(); + Subscription subscription = topicRef.getSubscription(subName); + assertNotNull(subscription); + assertTrue(subscription instanceof PersistentSubscription); + PersistentSubscription persistentSubscription = (PersistentSubscription) subscription; + assertEquals(persistentSubscription.getReplicatedControlled(), replicateSubscriptionState); + return true; + }); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 90ac520dce6c0..ae07aac01ea46 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -851,7 +851,7 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { setClientCnx(cnx); ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted, - conf.isReplicateSubscriptionState(), + conf.getReplicateSubscriptionState(), InitialPosition.valueOf(subscriptionInitialPosition.getValue()), startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(), // Use the current epoch to subscribe. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index ab2527b493236..c8d47914cf86c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -20,7 +20,9 @@ import static com.google.common.base.Preconditions.checkArgument; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Sets; +import io.swagger.annotations.ApiModelProperty; import java.io.Serializable; import java.util.Map; import java.util.Set; @@ -145,7 +147,13 @@ public int getMaxPendingChuckedMessage() { private long autoUpdatePartitionsIntervalSeconds = 60; - private boolean replicateSubscriptionState = false; + @ApiModelProperty( + name = "replicateSubscriptionState", + value = "If `replicateSubscriptionState` is enabled, a subscription state is replicated to geo-replicated" + + " clusters." + ) + @JsonProperty(access = JsonProperty.Access.READ_WRITE) + private Boolean replicateSubscriptionState; private boolean resetIncludeHead = false; @@ -185,4 +193,14 @@ public ConsumerConfigurationData clone() { throw new RuntimeException("Failed to clone ConsumerConfigurationData"); } } + + /** + * Backward compatibility with the old `replicateSubscriptionState` field. + * @deprecated Using {@link #getReplicateSubscriptionState()} instead. + */ + @JsonIgnore + @Deprecated + public boolean isReplicateSubscriptionState() { + return replicateSubscriptionState != null && replicateSubscriptionState; + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java index a2c76a8956ea8..4763cc208f5e3 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java @@ -460,7 +460,7 @@ public void testLoadConf() throws Exception { assertTrue(configurationData.isRetryEnable()); assertFalse(configurationData.isAutoUpdatePartitions()); assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 2); - assertTrue(configurationData.isReplicateSubscriptionState()); + assertEquals(configurationData.getReplicateSubscriptionState(), Boolean.TRUE); assertTrue(configurationData.isResetIncludeHead()); assertTrue(configurationData.isBatchIndexAckEnabled()); assertTrue(configurationData.isAckReceiptEnabled()); @@ -518,7 +518,7 @@ public void testLoadConfNotModified() { assertFalse(configurationData.isRetryEnable()); assertTrue(configurationData.isAutoUpdatePartitions()); assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 60); - assertFalse(configurationData.isReplicateSubscriptionState()); + assertNull(configurationData.getReplicateSubscriptionState()); assertFalse(configurationData.isResetIncludeHead()); assertFalse(configurationData.isBatchIndexAckEnabled()); assertFalse(configurationData.isAckReceiptEnabled()); @@ -537,6 +537,38 @@ public void testLoadConfNotModified() { assertNull(configurationData.getPayloadProcessor()); } + @Test + public void testReplicateSubscriptionState() { + ConsumerBuilderImpl consumerBuilder = createConsumerBuilder(); + assertNull(consumerBuilder.getConf().getReplicateSubscriptionState()); + + consumerBuilder.replicateSubscriptionState(true); + assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), Boolean.TRUE); + + consumerBuilder.replicateSubscriptionState(false); + assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), Boolean.FALSE); + + Map conf = new HashMap<>(); + consumerBuilder = createConsumerBuilder(); + consumerBuilder.loadConf(conf); + assertNull(consumerBuilder.getConf().getReplicateSubscriptionState()); + + conf.put("replicateSubscriptionState", true); + consumerBuilder = createConsumerBuilder(); + consumerBuilder.loadConf(conf); + assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), Boolean.TRUE); + + conf.put("replicateSubscriptionState", false); + consumerBuilder = createConsumerBuilder(); + consumerBuilder.loadConf(conf); + assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), Boolean.FALSE); + + conf.put("replicateSubscriptionState", null); + consumerBuilder = createConsumerBuilder(); + consumerBuilder.loadConf(conf); + assertNull(consumerBuilder.getConf().getReplicateSubscriptionState()); + } + private ConsumerBuilderImpl createConsumerBuilder() { ConsumerBuilderImpl consumerBuilder = new ConsumerBuilderImpl<>(null, Schema.BYTES); Map properties = new HashMap<>(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index dadaa56ae603d..0201213a16210 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -551,7 +551,7 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId, - Map metadata, boolean readCompacted, boolean isReplicated, + Map metadata, boolean readCompacted, Boolean isReplicated, InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec, SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist) { return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, @@ -562,7 +562,7 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId, - Map metadata, boolean readCompacted, boolean isReplicated, + Map metadata, boolean readCompacted, Boolean isReplicated, InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec, SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist, KeySharedPolicy keySharedPolicy, Map subscriptionProperties, long consumerEpoch) { @@ -578,9 +578,11 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu .setDurable(isDurable) .setReadCompacted(readCompacted) .setInitialPosition(subscriptionInitialPosition) - .setReplicateSubscriptionState(isReplicated) .setForceTopicCreation(createTopicIfDoesNotExist) .setConsumerEpoch(consumerEpoch); + if (isReplicated != null) { + subscribe.setReplicateSubscriptionState(isReplicated); + } if (subscriptionProperties != null && !subscriptionProperties.isEmpty()) { List keyValues = new ArrayList<>();