Skip to content

Commit

Permalink
Merge branch 'master' into rxsocks
Browse files Browse the repository at this point in the history
  • Loading branch information
RockyLOMO committed Apr 20, 2024
2 parents 46be419 + baf2596 commit 1eaf454
Show file tree
Hide file tree
Showing 39 changed files with 3,131 additions and 14 deletions.
55 changes: 52 additions & 3 deletions rxlib-x/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.github.rockylomo</groupId>
Expand All @@ -17,6 +18,13 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<lombok.version>1.18.30</lombok.version>
<mysql.version>5.1.49</mysql.version>
<HikariCP.version>4.0.3</HikariCP.version>
<druid.version>1.2.21</druid.version>
<redisson.version>3.27.2</redisson.version>
<guava.version>32.1.1-jre</guava.version>

<javaxMail.version>1.6.2</javaxMail.version>
<poi.version>5.2.2</poi.version>
</properties>

<dependencies>
Expand All @@ -33,7 +41,7 @@
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>4.0.3</version>
<version>${HikariCP.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
Expand All @@ -44,7 +52,48 @@
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.2.21</version>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>${redisson.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>byte-buddy</artifactId>
<groupId>net.bytebuddy</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<!-- pinyin-->
<dependency>
<groupId>org.ahocorasick</groupId>
<artifactId>ahocorasick</artifactId>
<version>0.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>${poi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>${poi.version}</version>
</dependency>
<dependency>
<groupId>com.sun.mail</groupId>
<artifactId>javax.mail</artifactId>
<version>${javaxMail.version}</version>
</dependency>
</dependencies>
</project>
98 changes: 98 additions & 0 deletions rxlib-x/src/main/java/org/rx/crawler/Browser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package org.rx.crawler;

import org.rx.core.FluentWait;
import org.rx.core.Linq;
import org.rx.core.Reflects;
import org.rx.io.IOStream;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public interface Browser extends AutoCloseable {
String BLANK_URL = "about:blank", BODY_SELECTOR = "body";

static String readResourceJs(String resourcePath) {
return IOStream.readString(Reflects.getResource(resourcePath), StandardCharsets.UTF_8);
}

BrowserType getType();

void setCookieRegion(String cookieRegion);

long getWaitMillis();

String getCurrentUrl();

default FluentWait createWait(int timeoutSeconds) {
return FluentWait.polling(timeoutSeconds * 1000L, getWaitMillis());
}

default void navigateBlank() {
nativeGet(BLANK_URL);
}

void navigateUrl(String url) throws TimeoutException;

void navigateUrl(String url, String locatorSelector) throws TimeoutException;

void navigateUrl(String url, String locatorSelector, int timeoutSeconds) throws TimeoutException;

void nativeGet(String url);

String saveCookies(boolean reset) throws TimeoutException;

void clearCookies(boolean onlyBrowser);

void setRawCookie(String rawCookie);

String getRawCookie();

/**
* 基本的selector,不能包含:eq(1)等
*
* @param selector
* @return
*/
boolean hasElement(String selector);

String elementText(String selector);

Linq<String> elementsText(String selector);

String elementVal(String selector);

Linq<String> elementsVal(String selector);

String elementAttr(String selector, String... attrArgs);

Linq<String> elementsAttr(String selector, String... attrArgs);

void elementClick(String selector);

void elementClick(String selector, boolean waitElementLocated);

void elementPress(String selector, String keys);

void elementPress(String selector, String keys, boolean waitElementLocated);

void waitElementLocated(String selector) throws TimeoutException;

void waitElementLocated(String selector, int timeoutSeconds) throws TimeoutException;

void injectScript(String script);

//Boolean, Long, String, List, WebElement
<T> T executeScript(String script, Object... args);

<T> T injectAndExecuteScript(String injectScript, String script, Object... args);

<T> T executeConfigureScript(String scriptName, Object... args);

byte[] screenshotAsBytes(String selector);

void focus();

void maximize();

void normalize();
}
21 changes: 21 additions & 0 deletions rxlib-x/src/main/java/org/rx/crawler/BrowserAsyncRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.rx.crawler;

import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

import java.io.Serializable;
import java.util.UUID;

@Getter
@RequiredArgsConstructor
public class BrowserAsyncRequest implements Serializable, Comparable<BrowserAsyncRequest> {
private final UUID asyncId;
private final int priority;
private final String url;

@Override
public int compareTo(@NonNull BrowserAsyncRequest o) {
return Integer.compare(priority, o.priority);
}
}
14 changes: 14 additions & 0 deletions rxlib-x/src/main/java/org/rx/crawler/BrowserAsyncResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.rx.crawler;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

import java.io.Serializable;
import java.net.InetSocketAddress;

@Getter
@RequiredArgsConstructor
public class BrowserAsyncResponse implements Serializable {
private final BrowserAsyncRequest request;
private final InetSocketAddress endpoint;
}
159 changes: 159 additions & 0 deletions rxlib-x/src/main/java/org/rx/crawler/BrowserAsyncTopic.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package org.rx.crawler;

import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RPriorityBlockingQueue;
import org.redisson.api.RSetCache;
import org.redisson.api.RTopic;
import org.rx.core.Constants;
import org.rx.core.ResetEventWait;
import org.rx.exception.TraceHandler;
import org.rx.redis.RedisCache;
import org.rx.util.function.TripleAction;
import org.rx.util.function.TripleFunc;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;

import static org.rx.core.Sys.toJsonString;

@RequiredArgsConstructor
@Component
@ConditionalOnProperty(name = org.rx.spring.BeanRegister.REDIS_PROP_NAME)
@Slf4j
public class BrowserAsyncTopic {
@RequiredArgsConstructor
private class AsyncFuture<T> implements Future<T> {
private final UUID asyncId;
private final Object callback;
private final ResetEventWait waiter = new ResetEventWait();
@Getter
private volatile boolean done;
private volatile Throwable exception;
private T result;

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return callbacks.remove(asyncId) != null;
}

@Override
public boolean isCancelled() {
return !callbacks.containsKey(asyncId);
}

@Override
public T get() throws ExecutionException {
try {
return get(Constants.TIMEOUT_INFINITE, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
log.warn("ignore", e);
}
return null;
}

@Override
public T get(long timeout, TimeUnit unit) throws ExecutionException, TimeoutException {
if (!waiter.waitOne(TimeUnit.MILLISECONDS.convert(timeout, unit))) {
throw new TimeoutException();
}
if (exception != null) {
throw new ExecutionException(exception);
}
return result;
}
}

//避免topic多次listen
public static final String QUEUE_NAME = "BAsyncQueue", TOPIC_NAME = "BAsyncTopic", IN_PUBLISH_NAME = "BAsyncPublish";
private final RedisCache<?, ?> redisCache;
private RPriorityBlockingQueue<BrowserAsyncRequest> queue;
private RTopic topic;
private RSetCache<Integer> publishSet;
private final ConcurrentHashMap<UUID, AsyncFuture> callbacks = new ConcurrentHashMap<>();

@PostConstruct
public void init() {
queue = redisCache.getClient().getPriorityBlockingQueue(QUEUE_NAME);
topic = redisCache.getClient().getTopic(TOPIC_NAME);
publishSet = redisCache.getClient().getSetCache(IN_PUBLISH_NAME);
// require(queue, queue.trySetComparator(Comparator.comparingInt(BrowserAsyncRequest::getPriority)));
topic.addListener(BrowserAsyncResponse.class, (channel, asyncResponse) -> {
log.info("Async consume response {}", toJsonString(asyncResponse));
try {
AsyncFuture future = callbacks.get(asyncResponse.getRequest().getAsyncId());
if (future == null || future.isCancelled() || future.isDone()) {
return;
}
try (RemoteBrowser browser = RemoteBrowser.wrap(asyncResponse.getEndpoint())) {
if (future.isCancelled()) {
return;
}
if (future.callback instanceof TripleFunc) {
future.result = ((TripleFunc<RemoteBrowser, String, Object>) future.callback).invoke(browser, asyncResponse.getRequest().getUrl());
return;
}
((TripleAction<RemoteBrowser, String>) future.callback).invoke(browser, asyncResponse.getRequest().getUrl());
} catch (Throwable e) {
TraceHandler.INSTANCE.log("Async {} error", future.asyncId, e);
future.exception = e;
} finally {
callbacks.remove(future.asyncId);
future.done = true;
future.waiter.set();
}
} finally {
publishSet.remove(asyncResponse.getEndpoint().getPort());
}
});
log.info("register BrowserAsyncTopic ok");
}

//region Consume
public void add(@NonNull BrowserAsyncRequest request) {
queue.add(request);
}

public Future listen(UUID asyncId, TripleAction<RemoteBrowser, String> callback) {
AsyncFuture future = new AsyncFuture(asyncId, callback);
callbacks.put(asyncId, future);
return future;
}

public <T> Future<T> listen(UUID asyncId, TripleFunc<RemoteBrowser, String, T> callback) {
AsyncFuture<T> future = new AsyncFuture<>(asyncId, callback);
callbacks.put(asyncId, future);
return future;
}
//endregion

//region produce
public List<BrowserAsyncRequest> poll(int takeCount) {
return queue.poll(takeCount);
}

public BrowserAsyncRequest poll() {
return queue.poll();
}

public boolean isPublishing(int nextIdleId) {
return publishSet.contains(nextIdleId);
}

public void publish(BrowserAsyncResponse response) {
if (response == null || response.getRequest() == null || response.getRequest().getAsyncId() == null || response.getEndpoint() == null) {
log.warn("Async publish invalid response {}", toJsonString(response));
return;
}

publishSet.add(response.getEndpoint().getPort(), 6, TimeUnit.SECONDS);
topic.publish(response);
}
//endregion
}
5 changes: 5 additions & 0 deletions rxlib-x/src/main/java/org/rx/crawler/BrowserPoolListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.rx.crawler;

public interface BrowserPoolListener extends AutoCloseable {
int nextIdleId(BrowserType type);
}
14 changes: 14 additions & 0 deletions rxlib-x/src/main/java/org/rx/crawler/BrowserType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.rx.crawler;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
@Getter
public enum BrowserType {
CHROME("chrome.exe", "chromedriver.exe"),
IE("iexplore.exe", "IEDriverServer.exe");

private final String processName;
private final String driverName;
}
Loading

0 comments on commit 1eaf454

Please sign in to comment.