Skip to content

Commit

Permalink
fix deadlocks when producer is closed from inside send callback (#176)
Browse files Browse the repository at this point in the history
Co-authored-by: Radai Rosenblatt <[email protected]>
  • Loading branch information
radai-rosenblatt and Radai Rosenblatt authored Jun 15, 2020
1 parent 157ed93 commit c1b3250
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
import java.util.Collections;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand Down Expand Up @@ -102,6 +105,78 @@ record = new ProducerRecord<>(topic, 0, key, value);
mario.close();
}

@Test
public void testCloseFromProduceCallbackOnSenderThread() throws Exception {
String topic = "testCloseFromProduceCallbackOnSenderThread";
createTopic(topic, 1);

Random random = new Random(666);
Properties extra = new Properties();
extra.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "" + 50000000); //~50MB (larger than broker-size setting)
extra.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
extra.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
extra.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
Properties baseProducerConfig = getProducerProperties(extra);
LiKafkaInstrumentedProducerImpl<byte[], byte[]> producer = new LiKafkaInstrumentedProducerImpl<byte[], byte[]>(
baseProducerConfig,
Collections.emptyMap(),
(baseConfig, overrideConfig) -> new LiKafkaProducerImpl<byte[], byte[]>(LiKafkaClientsUtils.getConsolidatedProperties(baseConfig, overrideConfig)),
() -> "bogus",
10 //dont wait for a mario connection
);

byte[] key = new byte[3000];
byte[] value = new byte[49000000];
random.nextBytes(key);
random.nextBytes(value); //random data is incompressible, making sure our request is large
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, key, value);

AtomicReference<Throwable> issueRef = new AtomicReference<>();
Thread testThread = new Thread(new Runnable() {
@Override
public void run() {
try {
final Thread ourThread = Thread.currentThread();
Future<RecordMetadata> future = producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
//we expect a RecordTooLargeException. we also expect this to happen
//on the same thread.
if (Thread.currentThread() != ourThread) {
issueRef.compareAndSet(null,
new IllegalStateException("completion did not happen on caller thread by " + Thread.currentThread().getName())
);
}
producer.close(1, TimeUnit.SECONDS);
}
});
RecordMetadata recordMetadata = future.get(1, TimeUnit.MINUTES);
} catch (Throwable anything) {
issueRef.compareAndSet(null, anything);
}
}
}, "testCloseFromProduceCallbackOnSenderThread-thread");
testThread.setDaemon(true);
testThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
issueRef.compareAndSet(null, e);
}
});
testThread.start();

testThread.join(TimeUnit.MINUTES.toMillis(1));
Thread.State state = testThread.getState();
Assert.assertEquals(
state,
Thread.State.TERMINATED,
"thread was expected to finish, instead its " + state
);
Throwable issue = issueRef.get();
Throwable root = Throwables.getRootCause(issue);
Assert.assertTrue(root instanceof RecordTooLargeException, root.getMessage());
}

private void createTopic(String topicName, int numPartitions) throws Exception {
try (AdminClient adminClient = createRawAdminClient(null)) {
adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, numPartitions, (short) 1))).all().get(1, TimeUnit.MINUTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
Expand Down Expand Up @@ -54,7 +53,8 @@ public class LiKafkaInstrumentedProducerImpl<K, V> implements DelegatingProducer
private static final String BOUNDED_FLUSH_THREAD_PREFIX = "Bounded-Flush-Thread-";

private final long initialConnectionTimeoutMs;
private final ReadWriteLock delegateLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock delegateLock = new ReentrantReadWriteLock();
private final Object closeLock = new Object();
private final Properties baseConfig;
private final Map<String, String> libraryVersions;
private final ProducerFactory<K, V> producerFactory;
Expand Down Expand Up @@ -198,6 +198,10 @@ public Producer<K, V> getDelegate() {
private boolean recreateDelegate(boolean abortIfExists) {
delegateLock.writeLock().lock();
try {
if (isClosed()) {
LOG.debug("this producer has been closed, not creating a new delegate");
return false;
}
Producer<K, V> prevProducer = delegate;
if (prevProducer != null) {
if (abortIfExists) {
Expand Down Expand Up @@ -312,6 +316,8 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
verifyOpen();

//the callback may try and obtain a write lock (say call producer.close())
//so we grab an update lock, which is upgradable to a write lock
delegateLock.readLock().lock();
try {
return delegate.send(record, callback);
Expand Down Expand Up @@ -444,15 +450,38 @@ private boolean proceedClosing() {
if (isClosed()) {
return false;
}
delegateLock.writeLock().lock();
try {
synchronized (closeLock) {
if (isClosed()) {
return false;
return false; //someone beat us to it
}
int holds = delegateLock.getReadHoldCount(); //this is for our thread
ReentrantReadWriteLock.ReadLock readLock = delegateLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = delegateLock.writeLock();
if (holds > 0) { //do we own a read lock ?
for (int i = 0; i < holds; i++) {
readLock.unlock();
}
//at this point we no longer hold a read lock, but any number of other
//readers/writers may slip past us
}
try {
writeLock.lock(); //wait for a write lock
try {
if (isClosed()) {
return false; //some other writer may have beaten us again
}
closedAt = System.currentTimeMillis();
return true;
} finally {
writeLock.unlock();
}
} finally {
if (holds > 0) { //restore our read lock holds (if we had any)
for (int i = 0; i < holds; i++) {
readLock.lock();
}
}
}
closedAt = System.currentTimeMillis();
return true;
} finally {
delegateLock.writeLock().unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.linkedin.kafka.clients.auditing.AuditType;
import com.linkedin.kafka.clients.auditing.Auditor;
import com.linkedin.kafka.clients.auditing.NoOpAuditor;
import com.linkedin.kafka.clients.largemessage.LargeMessageCallback;
import com.linkedin.kafka.clients.largemessage.LargeMessageSegment;
import com.linkedin.kafka.clients.largemessage.MessageSplitter;
Expand Down Expand Up @@ -469,19 +470,23 @@ public void close(long timeout, TimeUnit timeUnit) {
long deadlineTimeMs = startTimeMs + budgetMs;

_closed = true;
synchronized (_numThreadsInSend) {
long remainingMs = deadlineTimeMs - System.currentTimeMillis();
while (_numThreadsInSend.get() > 0 && remainingMs > 0) {
try {
_numThreadsInSend.wait(remainingMs);
} catch (InterruptedException e) {
LOG.error("Interrupted when there are still {} sender threads.", _numThreadsInSend.get());
break;

//wait for all producing threads to clear the auditor
//if there's a meaningful auditor.
if (!(_auditor instanceof NoOpAuditor)) {
synchronized (_numThreadsInSend) {
long remainingMs = deadlineTimeMs - System.currentTimeMillis();
while (_numThreadsInSend.get() > 0 && remainingMs > 0) {
try {
_numThreadsInSend.wait(remainingMs);
} catch (InterruptedException e) {
LOG.error("Interrupted when there are still {} sender threads.", _numThreadsInSend.get());
break;
}
remainingMs = deadlineTimeMs - System.currentTimeMillis();
}
remainingMs = deadlineTimeMs - System.currentTimeMillis();
}
}

_auditor.close(Math.max(0, deadlineTimeMs - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
_producer.close(Math.max(0, deadlineTimeMs - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
LOG.info("LiKafkaProducer shutdown complete in {} millis", (System.currentTimeMillis() - startTimeMs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ private static String fishForClientId(Map<MetricName, ? extends Metric> metrics)

/**
* Special header keys have a "_" prefix and are managed internally by the clients.
* @param headers
* @return
* @param headers kafka headers object
* @return any "special" headers container in the argument map
*/
public static Map<String, byte[]> fetchSpecialHeaders(Headers headers) {
Map<String, byte[]> map = new HashMap<>();
Expand Down

0 comments on commit c1b3250

Please sign in to comment.