diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml index bf44c51b6ad02..87d8cd7cf9a78 100644 --- a/.github/workflows/pulsar-ci.yaml +++ b/.github/workflows/pulsar-ci.yaml @@ -1498,7 +1498,7 @@ jobs: - name: trigger dependency check run: | mvn -B -ntp verify -PskipDocker,skip-all,owasp-dependency-check -Dcheckstyle.skip=true -DskipTests \ - -pl '!distribution/server,!distribution/io,!distribution/offloaders,!tiered-storage/file-system,!pulsar-io/flume,!pulsar-io/hbase,!pulsar-io/hdfs2,!pulsar-io/hdfs3,!pulsar-io/docs,!pulsar-io/jdbc/openmldb' + -pl '!distribution/server,!distribution/io,!distribution/offloaders,!tiered-storage/file-system,!pulsar-io/flume,!pulsar-io/hbase,!pulsar-io/hdfs3,!pulsar-io/docs,!pulsar-io/jdbc/openmldb' - name: Upload report uses: actions/upload-artifact@v4 diff --git a/deployment/terraform-ansible/deploy-pulsar.yaml b/deployment/terraform-ansible/deploy-pulsar.yaml index db2fd1257ca41..3a9f0fd942c17 100644 --- a/deployment/terraform-ansible/deploy-pulsar.yaml +++ b/deployment/terraform-ansible/deploy-pulsar.yaml @@ -147,7 +147,6 @@ # - file # - flume # - hbase -# - hdfs2 # - hdfs3 # - influxdb # - jdbc-clickhouse diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml index f98ee14bb20c9..cf7731b4c85ab 100644 --- a/distribution/io/src/assemble/io.xml +++ b/distribution/io/src/assemble/io.xml @@ -63,7 +63,6 @@ ${basedir}/../../pulsar-io/kafka-connect-adaptor-nar/target/pulsar-io-kafka-connect-adaptor-${project.version}.nar ${basedir}/../../pulsar-io/hbase/target/pulsar-io-hbase-${project.version}.nar ${basedir}/../../pulsar-io/kinesis/target/pulsar-io-kinesis-${project.version}.nar - ${basedir}/../../pulsar-io/hdfs2/target/pulsar-io-hdfs2-${project.version}.nar ${basedir}/../../pulsar-io/hdfs3/target/pulsar-io-hdfs3-${project.version}.nar ${basedir}/../../pulsar-io/file/target/pulsar-io-file-${project.version}.nar ${basedir}/../../pulsar-io/data-generator/target/pulsar-io-data-generator-${project.version}.nar diff --git a/pom.xml b/pom.xml index f99eb3066d5e6..b89dd1597cc84 100644 --- a/pom.xml +++ b/pom.xml @@ -196,7 +196,6 @@ flexible messaging model and an intuitive client API. 0.4.6 2.7.5 0.4.4-hotfix1 - 3.3.5 2.4.10 2.16.0 8.12.1 @@ -207,9 +206,10 @@ flexible messaging model and an intuitive client API. 1.15.16.Final 0.11.1 0.28.0 - 2.10.2 - 3.3.5 - 2.4.16 + 3.4.0 + 3.6.2 + ${hadoop3.version} + 2.6.0-hadoop3 32.1.2-jre 1.0 0.16.1 @@ -1313,6 +1313,58 @@ flexible messaging model and an intuitive client API. ${commons.collections4.version} + + + org.apache.hadoop + hadoop-common + ${hadoop3.version} + + + dnsjava + dnsjava + + + + + org.apache.hadoop + hadoop-auth + ${hadoop3.version} + + + dnsjava + dnsjava + + + + + org.apache.hadoop + hadoop-client + ${hadoop3.version} + + + dnsjava + dnsjava + + + + + org.apache.hbase + hbase-client + ${hbase.version} + + + dnsjava + dnsjava + + + + + + dnsjava + dnsjava + ${dnsjava3.version} + + com.lmax diff --git a/pulsar-bom/pom.xml b/pulsar-bom/pom.xml index d195411fa6479..e674301f18a3a 100644 --- a/pulsar-bom/pom.xml +++ b/pulsar-bom/pom.xml @@ -495,11 +495,6 @@ pulsar-io-hbase ${project.version} - - org.apache.pulsar - pulsar-io-hdfs2 - ${project.version} - org.apache.pulsar pulsar-io-hdfs3 diff --git a/pulsar-io/docs/pom.xml b/pulsar-io/docs/pom.xml index ac4ae9496d1bb..e373db26c450d 100644 --- a/pulsar-io/docs/pom.xml +++ b/pulsar-io/docs/pom.xml @@ -127,11 +127,6 @@ pulsar-io-hbase ${project.version} - - ${project.groupId} - pulsar-io-hdfs2 - ${project.version} - ${project.groupId} pulsar-io-hdfs3 diff --git a/pulsar-io/hdfs2/pom.xml b/pulsar-io/hdfs2/pom.xml deleted file mode 100644 index d5fb33c170db1..0000000000000 --- a/pulsar-io/hdfs2/pom.xml +++ /dev/null @@ -1,130 +0,0 @@ - - - 4.0.0 - - org.apache.pulsar - pulsar-io - 4.0.0-SNAPSHOT - - pulsar-io-hdfs2 - Pulsar IO :: Hdfs2 - - - - ${project.groupId} - pulsar-io-core - ${project.version} - - - - com.fasterxml.jackson.core - jackson-databind - - - - com.fasterxml.jackson.dataformat - jackson-dataformat-yaml - - - - org.apache.commons - commons-collections4 - - - - org.apache.hadoop - hadoop-client - ${hadoop2.version} - - - log4j - log4j - - - org.slf4j - * - - - org.apache.avro - avro - - - - - org.apache.commons - commons-lang3 - - - - - - - org.apache.nifi - nifi-nar-maven-plugin - - - com.github.spotbugs - spotbugs-maven-plugin - ${spotbugs-maven-plugin.version} - - ${basedir}/src/main/resources/findbugsExclude.xml - - - - spotbugs - verify - - check - - - - - - - - - - owasp-dependency-check - - - - org.owasp - dependency-check-maven - - - - aggregate - - none - - - - - - - - - \ No newline at end of file diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConfig.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConfig.java deleted file mode 100644 index 757360e04533c..0000000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConfig.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2; - -import java.io.Serializable; -import lombok.Data; -import lombok.experimental.Accessors; -import org.apache.commons.lang.StringUtils; - -/** - * Configuration object for all HDFS components. - */ -@Data -@Accessors(chain = true) -public abstract class AbstractHdfsConfig implements Serializable { - - private static final long serialVersionUID = 1L; - - /** - * A file or comma separated list of files which contains the Hadoop file system configuration, - * e.g. 'core-site.xml', 'hdfs-site.xml'. - */ - private String hdfsConfigResources; - - /** - * The HDFS directory from which files should be read from or written to. - */ - private String directory; - - /** - * The character encoding for the files, e.g. UTF-8, ASCII, etc. - */ - private String encoding; - - /** - * The compression codec used to compress/de-compress the files on HDFS. - */ - private Compression compression; - - /** - * The Kerberos user principal account to use for authentication. - */ - private String kerberosUserPrincipal; - - /** - * The full pathname to the Kerberos keytab file to use for authentication. - */ - private String keytab; - - public void validate() { - if (StringUtils.isEmpty(hdfsConfigResources) || StringUtils.isEmpty(directory)) { - throw new IllegalArgumentException("Required property not set."); - } - - if ((StringUtils.isNotEmpty(kerberosUserPrincipal) && StringUtils.isEmpty(keytab)) - || (StringUtils.isEmpty(kerberosUserPrincipal) && StringUtils.isNotEmpty(keytab))) { - throw new IllegalArgumentException("Values for both kerberosUserPrincipal & keytab are required."); - } - } -} diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java deleted file mode 100644 index d7277aa627383..0000000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2; - -import java.io.IOException; -import java.lang.ref.WeakReference; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.URI; -import java.nio.charset.Charset; -import java.security.PrivilegedExceptionAction; -import java.util.Collections; -import java.util.Map; -import java.util.WeakHashMap; -import java.util.concurrent.atomic.AtomicReference; -import javax.net.SocketFactory; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.io.compress.DefaultCodec; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.pulsar.io.hdfs2.sink.HdfsSinkConfig; - -/** - * A Simple abstract class for HDFS connectors. - * Provides methods for connecting to HDFS - */ -public abstract class AbstractHdfsConnector { - - private static final Object RESOURCES_LOCK = new Object(); - - // Hadoop Configuration, Filesystem, and UserGroupInformation (optional) - protected final AtomicReference hdfsResources = new AtomicReference<>(); - protected AbstractHdfsConfig connectorConfig; - protected CompressionCodecFactory compressionCodecFactory; - - public AbstractHdfsConnector() { - hdfsResources.set(new HdfsResources(null, null, null)); - } - - /* - * Reset Hadoop Configuration and FileSystem based on the supplied configuration resources. - */ - protected HdfsResources resetHDFSResources(HdfsSinkConfig hdfsSinkConfig) throws IOException { - Configuration config = new ExtendedConfiguration(); - config.setClassLoader(Thread.currentThread().getContextClassLoader()); - - getConfig(config, connectorConfig.getHdfsConfigResources()); - - // first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout - checkHdfsUriForTimeout(config); - - /* Disable caching of Configuration and FileSystem objects, else we cannot reconfigure - * the processor without a complete restart - */ - String disableCacheName = String.format("fs.%s.impl.disable.cache", - FileSystem.getDefaultUri(config).getScheme()); - config.set(disableCacheName, "true"); - - // If kerberos is enabled, create the file system as the kerberos principal - // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time - FileSystem fs; - UserGroupInformation ugi; - synchronized (RESOURCES_LOCK) { - if (SecurityUtil.isSecurityEnabled(config)) { - ugi = SecurityUtil.loginKerberos(config, - connectorConfig.getKerberosUserPrincipal(), connectorConfig.getKeytab()); - fs = getFileSystemAsUser(config, ugi); - } else { - config.set("ipc.client.fallback-to-simple-auth-allowed", "true"); - config.set("hadoop.security.authentication", "simple"); - ugi = SecurityUtil.loginSimple(config); - fs = getFileSystemAsUser(config, ugi); - } - } - return new HdfsResources(config, fs, ugi); - } - - private static Configuration getConfig(final Configuration config, String res) throws IOException { - boolean foundResources = false; - if (null != res) { - String[] resources = res.split(","); - for (String resource : resources) { - config.addResource(new Path(resource.trim())); - foundResources = true; - } - } - - if (!foundResources) { - // check that at least 1 non-default resource is available on the classpath - String configStr = config.toString(); - for (String resource : configStr.substring(configStr.indexOf(":") + 1).split(",")) { - if (!resource.contains("default") && config.getResource(resource.trim()) != null) { - foundResources = true; - break; - } - } - } - - if (!foundResources) { - throw new IOException("Could not find any of the " + res + " on the classpath"); - } - return config; - } - - /* - * Reduce the timeout of a socket connection from the default in FileSystem.get() - */ - protected void checkHdfsUriForTimeout(Configuration config) throws IOException { - URI hdfsUri = FileSystem.getDefaultUri(config); - String address = hdfsUri.getAuthority(); - int port = hdfsUri.getPort(); - if (address == null || address.isEmpty() || port < 0) { - return; - } - InetSocketAddress namenode = NetUtils.createSocketAddr(address, port); - SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config); - try (Socket socket = socketFactory.createSocket()) { - NetUtils.connect(socket, namenode, 1000); // 1 second timeout - } - } - - /** - * This exists in order to allow unit tests to override it so that they don't take several - * minutes waiting for UDP packets to be received. - * - * @param config - * the configuration to use - * @return the FileSystem that is created for the given Configuration - * @throws IOException - * if unable to create the FileSystem - */ - protected FileSystem getFileSystem(final Configuration config) throws IOException { - return FileSystem.get(config); - } - - protected FileSystem getFileSystemAsUser(final Configuration config, UserGroupInformation ugi) throws IOException { - try { - return ugi.doAs((PrivilegedExceptionAction) () -> FileSystem.get(config)); - } catch (InterruptedException e) { - throw new IOException("Unable to create file system: " + e.getMessage()); - } - } - - protected Configuration getConfiguration() { - return hdfsResources.get().getConfiguration(); - } - - protected FileSystem getFileSystem() { - return hdfsResources.get().getFileSystem(); - } - - protected UserGroupInformation getUserGroupInformation() { - return hdfsResources.get().getUserGroupInformation(); - } - - protected String getEncoding() { - return StringUtils.isNotBlank(connectorConfig.getEncoding()) - ? connectorConfig.getEncoding() : Charset.defaultCharset().name(); - } - - protected CompressionCodec getCompressionCodec() { - if (connectorConfig.getCompression() == null) { - return null; - } - - CompressionCodec codec = getCompressionCodecFactory() - .getCodecByName(connectorConfig.getCompression().name()); - - return (codec != null) ? codec : new DefaultCodec(); - } - - protected CompressionCodecFactory getCompressionCodecFactory() { - if (compressionCodecFactory == null) { - compressionCodecFactory = new CompressionCodecFactory(getConfiguration()); - } - - return compressionCodecFactory; - } - - /** - * Extending Hadoop Configuration to prevent it from caching classes that can't be found. Since users may be - * adding additional JARs to the classpath we don't want them to have to restart the JVM to be able to load - * something that was previously not found, but might now be available. - * Reference the original getClassByNameOrNull from Configuration. - */ - static class ExtendedConfiguration extends Configuration { - - private final Map>>> cacheClasses = new WeakHashMap<>(); - - @Override - public Class getClassByNameOrNull(String name) { - final ClassLoader classLoader = getClassLoader(); - - Map>> map; - synchronized (cacheClasses) { - map = cacheClasses.get(classLoader); - if (map == null) { - map = Collections.synchronizedMap(new WeakHashMap<>()); - cacheClasses.put(classLoader, map); - } - } - - Class clazz = null; - WeakReference> ref = map.get(name); - if (ref != null) { - clazz = ref.get(); - } - - if (clazz == null) { - try { - clazz = Class.forName(name, true, classLoader); - } catch (ClassNotFoundException | NoClassDefFoundError e) { - return null; - } - // two putters can race here, but they'll put the same class - map.put(name, new WeakReference<>(clazz)); - return clazz; - } else { - // cache hit - return clazz; - } - } - - } -} diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/Compression.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/Compression.java deleted file mode 100644 index 1e3d2f9490439..0000000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/Compression.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2; - -/** - * An enumeration of compression codecs available for HDFS. - */ -public enum Compression { - BZIP2, DEFLATE, GZIP, LZ4, SNAPPY, ZSTANDARD -} diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/HdfsResources.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/HdfsResources.java deleted file mode 100644 index 5fd6b283e6b41..0000000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/HdfsResources.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.security.UserGroupInformation; - -/** - * A wrapper class for HDFS resources. - */ -public class HdfsResources { - - private final Configuration configuration; - private final FileSystem fileSystem; - private final UserGroupInformation userGroupInformation; - - public HdfsResources(Configuration config, FileSystem fs, UserGroupInformation ugi) { - this.configuration = config; - this.fileSystem = fs; - this.userGroupInformation = ugi; - } - - public Configuration getConfiguration() { - return configuration; - } - - public FileSystem getFileSystem() { - return fileSystem; - } - - public UserGroupInformation getUserGroupInformation() { - return userGroupInformation; - } -} diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/SecurityUtil.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/SecurityUtil.java deleted file mode 100644 index ca178aad911e2..0000000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/SecurityUtil.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2; - -import java.io.IOException; -import org.apache.commons.lang3.Validate; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; - -/** - * Provides synchronized access to UserGroupInformation to avoid multiple processors/services from - * interfering with each other. - */ -public class SecurityUtil { - public static final String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication"; - public static final String KERBEROS = "kerberos"; - - /** - * Initializes UserGroupInformation with the given Configuration and performs the login for the - * given principal and keytab. All logins should happen through this class to ensure other threads - * are not concurrently modifying UserGroupInformation. - *

- * @param config the configuration instance - * @param principal the principal to authenticate as - * @param keyTab the keytab to authenticate with - * - * @return the UGI for the given principal - * - * @throws IOException if login failed - */ - public static synchronized UserGroupInformation loginKerberos(final Configuration config, - final String principal, final String keyTab) throws IOException { - Validate.notNull(config); - Validate.notNull(principal); - Validate.notNull(keyTab); - - UserGroupInformation.setConfiguration(config); - UserGroupInformation.loginUserFromKeytab(principal.trim(), keyTab.trim()); - return UserGroupInformation.getCurrentUser(); - } - - /** - * Initializes UserGroupInformation with the given Configuration and - * returns UserGroupInformation.getLoginUser(). All logins should happen - * through this class to ensure other threads are not concurrently - * modifying UserGroupInformation. - * - * @param config the configuration instance - * - * @return the UGI for the given principal - * - * @throws IOException if login failed - */ - public static synchronized UserGroupInformation loginSimple(final Configuration config) throws IOException { - Validate.notNull(config); - UserGroupInformation.setConfiguration(config); - return UserGroupInformation.getLoginUser(); - } - - /** - * Initializes UserGroupInformation with the given Configuration and returns - * UserGroupInformation.isSecurityEnabled(). - * All checks for isSecurityEnabled() should happen through this method. - * - * @param config the given configuration - * - * @return true if kerberos is enabled on the given configuration, false otherwise - * - */ - public static boolean isSecurityEnabled(final Configuration config) { - Validate.notNull(config); - return KERBEROS.equalsIgnoreCase(config.get(HADOOP_SECURITY_AUTHENTICATION)); - } -} diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/package-info.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/package-info.java deleted file mode 100644 index 464c6db341e8f..0000000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/package-info.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2; \ No newline at end of file diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java deleted file mode 100644 index 7b025d16378ff..0000000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2.sink; - -import java.io.IOException; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.io.FilenameUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.core.KeyValue; -import org.apache.pulsar.io.core.Sink; -import org.apache.pulsar.io.core.SinkContext; -import org.apache.pulsar.io.hdfs2.AbstractHdfsConnector; -import org.apache.pulsar.io.hdfs2.HdfsResources; - -/** - * A Simple abstract class for HDFS sink. - * Users need to implement extractKeyValue function to use this sink. - */ -@Slf4j -public abstract class HdfsAbstractSink extends AbstractHdfsConnector implements Sink { - - protected HdfsSinkConfig hdfsSinkConfig; - protected BlockingQueue> unackedRecords; - protected HdfsSyncThread syncThread; - private Path path; - private FSDataOutputStream hdfsStream; - private DateTimeFormatter subdirectoryFormatter; - - public abstract KeyValue extractKeyValue(Record record); - protected abstract void createWriter() throws IOException; - - @Override - public void open(Map config, SinkContext sinkContext) throws Exception { - hdfsSinkConfig = HdfsSinkConfig.load(config); - hdfsSinkConfig.validate(); - connectorConfig = hdfsSinkConfig; - unackedRecords = new LinkedBlockingQueue> (hdfsSinkConfig.getMaxPendingRecords()); - if (hdfsSinkConfig.getSubdirectoryPattern() != null) { - subdirectoryFormatter = DateTimeFormatter.ofPattern(hdfsSinkConfig.getSubdirectoryPattern()); - } - connectToHdfs(); - createWriter(); - launchSyncThread(); - } - - @Override - public void close() throws Exception { - syncThread.halt(); - syncThread.join(0); - } - - protected final void connectToHdfs() throws IOException { - try { - HdfsResources resources = hdfsResources.get(); - - if (resources.getConfiguration() == null) { - resources = this.resetHDFSResources(hdfsSinkConfig); - hdfsResources.set(resources); - } - } catch (IOException ex) { - hdfsResources.set(new HdfsResources(null, null, null)); - throw ex; - } - } - - protected FSDataOutputStream getHdfsStream() throws IllegalArgumentException, IOException { - if (hdfsStream == null) { - Path path = getPath(); - FileSystem fs = getFileSystemAsUser(getConfiguration(), getUserGroupInformation()); - hdfsStream = fs.exists(path) ? fs.append(path) : fs.create(path); - } - return hdfsStream; - } - - protected final Path getPath() { - if (path == null) { - String ext = ""; - if (StringUtils.isNotBlank(hdfsSinkConfig.getFileExtension())) { - ext = hdfsSinkConfig.getFileExtension(); - } else if (getCompressionCodec() != null) { - ext = getCompressionCodec().getDefaultExtension(); - } - - String directory = hdfsSinkConfig.getDirectory(); - if (subdirectoryFormatter != null) { - directory = FilenameUtils.concat(directory, LocalDateTime.now().format(subdirectoryFormatter)); - } - path = new Path(FilenameUtils.concat(directory, - hdfsSinkConfig.getFilenamePrefix() + "-" + System.currentTimeMillis() + ext)); - log.info("Create path: {}", path); - } - return path; - } - - protected final void launchSyncThread() throws IOException { - syncThread = new HdfsSyncThread(getHdfsStream(), unackedRecords, hdfsSinkConfig.getSyncInterval()); - syncThread.start(); - } -} \ No newline at end of file diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java deleted file mode 100644 index 9e1c6090fb5b7..0000000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2.sink; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import java.io.File; -import java.io.IOException; -import java.io.Serializable; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.Map; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.experimental.Accessors; -import org.apache.commons.lang.StringUtils; -import org.apache.pulsar.io.hdfs2.AbstractHdfsConfig; - -/** - * Configuration object for all HDFS Sink components. - */ -@Data -@EqualsAndHashCode(callSuper = false) -@Accessors(chain = true) -public class HdfsSinkConfig extends AbstractHdfsConfig implements Serializable { - - private static final long serialVersionUID = 1L; - - /** - * The prefix of the files to create inside the HDFS directory, i.e. a value of "topicA" - * will result in files named topicA-, topicA-, etc being produced - */ - private String filenamePrefix; - - /** - * The extension to add to the files written to HDFS, e.g. '.txt', '.seq', etc. - */ - private String fileExtension; - - /** - * The character to use to separate records in a text file. If no value is provided - * then the content from all of the records will be concatenated together in one continuous - * byte array. - */ - private char separator; - - /** - * The interval (in milliseconds) between calls to flush data to HDFS disk. - */ - private long syncInterval; - - /** - * The maximum number of records that we hold in memory before acking. Default is Integer.MAX_VALUE. - * Setting this value to one, results in every record being sent to disk before the record is acked, - * while setting it to a higher values allows us to buffer records before flushing them all to disk. - */ - private int maxPendingRecords = Integer.MAX_VALUE; - - /** - * A subdirectory associated with the created time of the sink. - * The pattern is the formatted pattern of {@link AbstractHdfsConfig#getDirectory()}'s subdirectory. - * - * @see java.time.format.DateTimeFormatter for pattern's syntax - */ - private String subdirectoryPattern; - - public static HdfsSinkConfig load(String yamlFile) throws IOException { - ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); - return mapper.readValue(new File(yamlFile), HdfsSinkConfig.class); - } - - public static HdfsSinkConfig load(Map map) throws IOException { - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(mapper.writeValueAsString(map), HdfsSinkConfig.class); - } - - @Override - public void validate() { - super.validate(); - if ((StringUtils.isEmpty(fileExtension) && getCompression() == null) - || StringUtils.isEmpty(filenamePrefix)) { - throw new IllegalArgumentException("Required property not set."); - } - - if (syncInterval < 0) { - throw new IllegalArgumentException("Sync Interval cannot be negative"); - } - - if (maxPendingRecords < 1) { - throw new IllegalArgumentException("Max Pending Records must be a positive integer"); - } - - if (subdirectoryPattern != null) { - try { - LocalDateTime.of(2020, 1, 1, 12, 0).format(DateTimeFormatter.ofPattern(subdirectoryPattern)); - } catch (Exception e) { - throw new IllegalArgumentException(subdirectoryPattern + " is not a valid pattern: " + e.getMessage()); - } - } - } -} diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSyncThread.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSyncThread.java deleted file mode 100644 index 9ddd83f4423f9..0000000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSyncThread.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2.sink; - -import java.io.IOException; -import java.util.concurrent.BlockingQueue; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.hadoop.fs.Syncable; -import org.apache.pulsar.functions.api.Record; - -/** - * A thread that runs in the background and acknowledges Records - * after they have been written to disk. - * - * @param - */ -public class HdfsSyncThread extends Thread { - - private final Syncable stream; - private final BlockingQueue> unackedRecords; - private final long syncInterval; - private boolean keepRunning = true; - - public HdfsSyncThread(Syncable stream, BlockingQueue> unackedRecords, long syncInterval) { - this.stream = stream; - this.unackedRecords = unackedRecords; - this.syncInterval = syncInterval; - } - - @Override - public void run() { - while (keepRunning) { - try { - Thread.sleep(syncInterval); - ackRecords(); - } catch (InterruptedException e) { - return; - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - public final void halt() throws IOException, InterruptedException { - keepRunning = false; - ackRecords(); - } - - private void ackRecords() throws IOException, InterruptedException { - - if (CollectionUtils.isEmpty(unackedRecords)) { - return; - } - - synchronized (stream) { - stream.hsync(); - } - - while (!unackedRecords.isEmpty()) { - unackedRecords.take().ack(); - } - } -} diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/package-info.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/package-info.java deleted file mode 100644 index 238a441ee0ee3..0000000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/package-info.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2.sink; \ No newline at end of file diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsAbstractSequenceFileSink.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsAbstractSequenceFileSink.java deleted file mode 100644 index 355c00080effe..0000000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsAbstractSequenceFileSink.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2.sink.seq; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.Writer; -import org.apache.hadoop.io.SequenceFile.Writer.Option; -import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.core.KeyValue; -import org.apache.pulsar.io.core.Sink; -import org.apache.pulsar.io.hdfs2.sink.HdfsAbstractSink; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * HDFS Sink that writes it contents to HDFS as Sequence Files. - * - * @param - The incoming Key type - * @param - The incoming Value type - * @param - The HDFS Key type - * @param - The HDFS Value type - */ -public abstract class HdfsAbstractSequenceFileSink - extends HdfsAbstractSink implements Sink { - - private static final Logger LOG = LoggerFactory.getLogger(HdfsAbstractSequenceFileSink.class); - - protected AtomicLong counter; - protected FSDataOutputStream hdfsStream; - protected Writer writer = null; - - public abstract KeyValue convert(KeyValue kv); - - @Override - public void close() throws Exception { - writer.close(); - super.close(); - } - - @Override - protected void createWriter() throws IOException { - writer = getWriter(); - } - - @Override - public void write(Record record) { - try { - KeyValue kv = extractKeyValue(record); - KeyValue keyValue = convert(kv); - writer.append(keyValue.getKey(), keyValue.getValue()); - unackedRecords.put(record); - } catch (IOException | InterruptedException e) { - LOG.error("Unable to write to file " + getPath(), e); - record.fail(); - } - } - - protected Writer getWriter() throws IOException { - counter = new AtomicLong(0); - List