Skip to content

Commit

Permalink
(fix #4054) synchronize lazy initializing in ParquetBucketMetadata (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty authored Oct 20, 2021
1 parent ae18337 commit b36c57e
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,23 @@ private K extractAvroKey(V value) {

// FIXME: what about `Option[T]`
private K extractScalaKey(V value) {
Object obj = value;
for (Method getter : getOrInitGetters(value.getClass())) {
try {
obj = getter.invoke(obj);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new IllegalStateException(
String.format("Failed to get field %s from class %s", getter.getName(), obj));
}
}
@SuppressWarnings("unchecked")
K key = (K) obj;
return key;
}

private synchronized Method[] getOrInitGetters(Class<?> cls) {
if (getters == null) {
getters = new Method[keyPath.length];
Class<?> cls = value.getClass();
for (int i = 0; i < keyPath.length; i++) {
Method getter = null;
try {
Expand All @@ -173,18 +187,7 @@ private K extractScalaKey(V value) {
cls = getter.getReturnType();
}
}
Object obj = value;
for (Method getter : getters) {
try {
obj = getter.invoke(obj);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new IllegalStateException(
String.format("Failed to get field %s from class %s", getter.getName(), obj));
}
}
@SuppressWarnings("unchecked")
K key = (K) obj;
return key;
return getters;
}

private static String[] toKeyPath(String keyField) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,15 @@ class ParquetEndToEndTest extends PipelineSpec {
ParquetAvroSortedBucketIO
.write(classOf[CharSequence], "user", eventSchema)
.to(eventsDir.toString)
.withNumBuckets(1)
)
sc1
.parallelize(avroUsers)
.saveAsSortedBucket(
ParquetAvroSortedBucketIO
.write(classOf[String], "name", userSchema)
.to(usersDir.toString)
.withNumBuckets(1)
)
sc1.run()

Expand Down Expand Up @@ -142,13 +144,15 @@ class ParquetEndToEndTest extends PipelineSpec {
ParquetTypeSortedBucketIO
.write[String, Event]("user")
.to(eventsDir.toString)
.withNumBuckets(1)
)
sc1
.parallelize(users)
.saveAsSortedBucket(
ParquetTypeSortedBucketIO
.write[String, User]("name")
.to(usersDir.toString)
.withNumBuckets(1)
)
sc1.run()

Expand Down Expand Up @@ -193,6 +197,7 @@ class ParquetEndToEndTest extends PipelineSpec {
ParquetAvroSortedBucketIO
.write(classOf[CharSequence], "name", classOf[AvroGeneratedUser])
.to(usersDir.toString)
.withNumBuckets(1)
)
sc1.run()

Expand Down

0 comments on commit b36c57e

Please sign in to comment.