From 87f90364bbe4bdf41c40e213a2bc8a7db07dbe92 Mon Sep 17 00:00:00 2001 From: Darin Douglass Date: Wed, 24 Jul 2024 10:51:03 -0400 Subject: [PATCH 1/5] first attempt at fastcdc --- src/clj_rabin/fast.clj | 141 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 src/clj_rabin/fast.clj diff --git a/src/clj_rabin/fast.clj b/src/clj_rabin/fast.clj new file mode 100644 index 0000000..13cc819 --- /dev/null +++ b/src/clj_rabin/fast.clj @@ -0,0 +1,141 @@ +3M(ns clj-rabin.fast + (:require [clojure.java.io :as io] + [clojure.math :as math]) + (:import [java.io BufferedInputStream ByteArrayInputStream ByteArrayOutputStream File InputStream] + [java.security MessageDigest])) + +(def masks + [0 ;; padding + 0 ;; padding + 0 ;; padding + 0 ;; padding + 0 ;; padding + 0x0000000001804110 ;; unused except for NC 3 + 0x0000000001803110 ;; 64B + 0x0000000018035100 ;; 128B + 0x0000001800035300 ;; 256B + 0x0000019000353000 ;; 512B + 0x0000590003530000 ;; 1KB + 0x0000d90003530000 ;; 2KB + 0x0000d90103530000 ;; 4KB + 0x0000d90303530000 ;; 8KB + 0x0000d90313530000 ;; 16KB + 0x0000d90f03530000 ;; 32KB + 0x0000d90303537000 ;; 64KB + 0x0000d90703537000 ;; 128KB + 0x0000d90707537000 ;; 256KB + 0x0000d91707537000 ;; 512KB + 0x0000d91747537000 ;; 1MB + 0x0000d91767537000 ;; 2MB + 0x0000d93767537000 ;; 4MB + 0x0000d93777537000 ;; 8MB + 0x0000d93777577000 ;; 16MB + 0x0000db3777577000 ;; unused except for NC 3 + ]) + +(defn random-long + ^long [^long seed] + (let [bytes (byte-array (repeat 64 (inc seed))) + algorithm (doto (MessageDigest/getInstance "MD5") + (.reset) + (.update bytes))] + (.longValue (BigInteger. 1 (.digest algorithm))))) + +(defn shift-once + ^long [^long l] + (bit-shift-left l 1)) + +(defn shift-twice + ^long [^long l] + (bit-shift-left l 2)) + +(def lookups + (let [gear (mapv random-long (range 255))] + {:gear gear + :shift (mapv shift-once gear)})) + +(defprotocol RabinHashable + (-hash-seq "returns a lazy-seq of chunks for this" [this ctx])) + +(defn drain + [n src dest] + (let [bytes (.readNBytes src n)] + (.write dest bytes) + bytes)) + +(defn- chunker + [^BufferedInputStream this {:size/keys [expected min max] + :keys [pos normalization]}] + (let [whats-left (.avaliable this) + bits (quot (math/log10 expected) (math/log10 2)) + masks {:small (masks (+ bits normalization)) + :large (masks (- bits normalization))}] + (if (<= whats-left min) + {:data (.readNBytes this whats-left) + :fingerprint 0 + :offset pos + :length whats-left} + (with-open [baos (ByteArrayOutputStream.)] + (let [upper-bound (if (> whats-left max) max whats-left) + normal (if (< whats-left expected) whats-left expected) + normal-cutoff (quot normal 2)] + (drain min this baos) + (reduce (fn [fingerprint i] + (let [mask (if (< i normal-cutoff) + (masks :small) + (masks :large)) + offset (* 2 i) + byte (drain 1 this baos) + fingerprint (+ (shift-twice fingerprint) (get-in lookups [:shift byte]))] + (if (zero? (bit-and fingerprint mask)) + (reduced + {:data (.toByteArray baos) + :fingerprint fingerprint + :offset pos + :length offset}) + (let [offset (inc offset) + byte (drain 1 this baos) + fingerprint (+ fingerprint (get-in lookups [:gear byte]))] + (if (map? chunk) + (reduced + {:data (.toByteArray baos) + :fingerprint fingerprint + :offset pos + :length offset}) + chunk))))) + 0 + (range (quot min 2) (quot upper-bound 2))))))) + + (extend BufferedInputStream + RabinHashable + {:-hash-seq + (fn [^BufferedInputStream this {:keys [chunk-fn] :or {chunk-fn chunker} :as ctx}] + (lazy-seq + (when (pos? (.available this)) + (when-let [{:keys [length] :as chunk} (chunk-fn this ctx)] + (cons chunk (-hash-seq this (update ctx :pos + length)))))))})) + +(extend InputStream + RabinHashable + {:-hash-seq + (fn [^InputStream this ctx] + (-hash-seq (BufferedInputStream. this) ctx))}) + +(extend bytes + RabinHashable + {:-hash-seq + (fn [^bytes this ctx] + (-hash-seq (ByteArrayInputStream. this) ctx))}) + +(extend File + RabinHashable + {:-hash-seq + (fn [^File this ctx] + (when (and (.exists this) (.isFile this)) + (-hash-seq (io/input-stream this) ctx)))}) + +(extend String + RabinHashable + {:-hash-seq + (fn [^String this ctx] + (-hash-seq (.getBytes this) ctx))}) From 4304c1fbec654be10985a5f787c9e00f6a4a1682 Mon Sep 17 00:00:00 2001 From: Darin Douglass Date: Wed, 24 Jul 2024 10:55:06 -0400 Subject: [PATCH 2/5] small fixes to make the ns actual load --- src/clj_rabin/fast.clj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/clj_rabin/fast.clj b/src/clj_rabin/fast.clj index 13cc819..380ddd4 100644 --- a/src/clj_rabin/fast.clj +++ b/src/clj_rabin/fast.clj @@ -55,7 +55,7 @@ :shift (mapv shift-once gear)})) (defprotocol RabinHashable - (-hash-seq "returns a lazy-seq of chunks for this" [this ctx])) + (-hash-seq [this ctx] "returns a lazy-seq of chunks for this")) (defn drain [n src dest] @@ -121,7 +121,7 @@ (fn [^InputStream this ctx] (-hash-seq (BufferedInputStream. this) ctx))}) -(extend bytes +(extend (class (make-array Byte/TYPE 0)) RabinHashable {:-hash-seq (fn [^bytes this ctx] From f94ffd3fbcebebfa2021dd494e797ba2a69dc212 Mon Sep 17 00:00:00 2001 From: Darin Douglass Date: Wed, 24 Jul 2024 11:34:04 -0400 Subject: [PATCH 3/5] more updates --- src/clj_rabin/fast.clj | 48 ++++++++++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/src/clj_rabin/fast.clj b/src/clj_rabin/fast.clj index 380ddd4..03ee9f9 100644 --- a/src/clj_rabin/fast.clj +++ b/src/clj_rabin/fast.clj @@ -59,15 +59,20 @@ (defn drain [n src dest] - (let [bytes (.readNBytes src n)] + (let [bytes (if (= 1 n) (.read src) (.readNBytes src n))] (.write dest bytes) bytes)) (defn- chunker - [^BufferedInputStream this {:size/keys [expected min max] - :keys [pos normalization]}] - (let [whats-left (.avaliable this) - bits (quot (math/log10 expected) (math/log10 2)) + [^BufferedInputStream this {:size/keys [min average max] + :keys [pos normalization] + :or {pos 0 + normalization 0 + min 64 + average 256 + max 1024}}] + (let [whats-left (.available this) + bits (int (quot (math/log10 average) (math/log10 2))) masks {:small (masks (+ bits normalization)) :large (masks (- bits normalization))}] (if (<= whats-left min) @@ -77,7 +82,7 @@ :length whats-left} (with-open [baos (ByteArrayOutputStream.)] (let [upper-bound (if (> whats-left max) max whats-left) - normal (if (< whats-left expected) whats-left expected) + normal (if (< whats-left average) whats-left average) normal-cutoff (quot normal 2)] (drain min this baos) (reduce (fn [fingerprint i] @@ -86,7 +91,8 @@ (masks :large)) offset (* 2 i) byte (drain 1 this baos) - fingerprint (+ (shift-twice fingerprint) (get-in lookups [:shift byte]))] + _ (prn :shift-stage offset byte fingerprint) + fingerprint (unchecked-add (shift-twice fingerprint) (get-in lookups [:shift byte]))] (if (zero? (bit-and fingerprint mask)) (reduced {:data (.toByteArray baos) @@ -95,42 +101,47 @@ :length offset}) (let [offset (inc offset) byte (drain 1 this baos) - fingerprint (+ fingerprint (get-in lookups [:gear byte]))] - (if (map? chunk) + fingerprint (unchecked-add fingerprint (get-in lookups [:gear byte]))] + (if (zero? (bit-and fingerprint mask)) (reduced {:data (.toByteArray baos) :fingerprint fingerprint :offset pos :length offset}) - chunk))))) + fingerprint))))) 0 - (range (quot min 2) (quot upper-bound 2))))))) + (range (quot min 2) (quot upper-bound 2)))))))) (extend BufferedInputStream - RabinHashable - {:-hash-seq - (fn [^BufferedInputStream this {:keys [chunk-fn] :or {chunk-fn chunker} :as ctx}] - (lazy-seq - (when (pos? (.available this)) - (when-let [{:keys [length] :as chunk} (chunk-fn this ctx)] - (cons chunk (-hash-seq this (update ctx :pos + length)))))))})) + RabinHashable + {:-hash-seq + (fn [^BufferedInputStream this {:keys [chunk-fn] :or {chunk-fn chunker} :as ctx}] + (prn :bis) + (lazy-seq + (when (pos? (.available this)) + (when-let [{:keys [length] :as chunk} (chunk-fn this ctx)] + (prn chunk) + (cons chunk (-hash-seq this (update ctx :pos (fnil + 0) length)))))))}) (extend InputStream RabinHashable {:-hash-seq (fn [^InputStream this ctx] + (prn :stream) (-hash-seq (BufferedInputStream. this) ctx))}) (extend (class (make-array Byte/TYPE 0)) RabinHashable {:-hash-seq (fn [^bytes this ctx] + (prn :bytes) (-hash-seq (ByteArrayInputStream. this) ctx))}) (extend File RabinHashable {:-hash-seq (fn [^File this ctx] + (prn :file) (when (and (.exists this) (.isFile this)) (-hash-seq (io/input-stream this) ctx)))}) @@ -138,4 +149,5 @@ RabinHashable {:-hash-seq (fn [^String this ctx] + (prn :string) (-hash-seq (.getBytes this) ctx))}) From ecd678a84f2b166ccea7222f819f931c84b185df Mon Sep 17 00:00:00 2001 From: Darin Douglass Date: Wed, 24 Jul 2024 11:47:09 -0400 Subject: [PATCH 4/5] more --- src/clj_rabin/fast.clj | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/src/clj_rabin/fast.clj b/src/clj_rabin/fast.clj index 03ee9f9..5358a74 100644 --- a/src/clj_rabin/fast.clj +++ b/src/clj_rabin/fast.clj @@ -76,14 +76,15 @@ masks {:small (masks (+ bits normalization)) :large (masks (- bits normalization))}] (if (<= whats-left min) - {:data (.readNBytes this whats-left) + {:data (String. (.readNBytes this whats-left)) :fingerprint 0 :offset pos :length whats-left} (with-open [baos (ByteArrayOutputStream.)] (let [upper-bound (if (> whats-left max) max whats-left) normal (if (< whats-left average) whats-left average) - normal-cutoff (quot normal 2)] + normal-cutoff (quot normal 2) + upper-cutoff (int (quot upper-bound 2))] (drain min this baos) (reduce (fn [fingerprint i] (let [mask (if (< i normal-cutoff) @@ -91,23 +92,24 @@ (masks :large)) offset (* 2 i) byte (drain 1 this baos) - _ (prn :shift-stage offset byte fingerprint) + ;; _ (prn :shift-stage offset byte fingerprint) fingerprint (unchecked-add (shift-twice fingerprint) (get-in lookups [:shift byte]))] - (if (zero? (bit-and fingerprint mask)) + (if (or (zero? (bit-and fingerprint mask)) + ;; if we've hit the end, return regardless + (= (dec upper-cutoff) i)) (reduced - {:data (.toByteArray baos) + {:data (String. (.toByteArray baos)) :fingerprint fingerprint :offset pos - :length offset}) - (let [offset (inc offset) - byte (drain 1 this baos) + :length (inc offset)}) + (let [byte (drain 1 this baos) fingerprint (unchecked-add fingerprint (get-in lookups [:gear byte]))] (if (zero? (bit-and fingerprint mask)) (reduced - {:data (.toByteArray baos) + {:data (String. (.toByteArray baos)) :fingerprint fingerprint :offset pos - :length offset}) + :length (+ 2 offset)}) fingerprint))))) 0 (range (quot min 2) (quot upper-bound 2)))))))) @@ -116,32 +118,28 @@ RabinHashable {:-hash-seq (fn [^BufferedInputStream this {:keys [chunk-fn] :or {chunk-fn chunker} :as ctx}] - (prn :bis) (lazy-seq (when (pos? (.available this)) (when-let [{:keys [length] :as chunk} (chunk-fn this ctx)] - (prn chunk) + ;; (prn chunk ctx) (cons chunk (-hash-seq this (update ctx :pos (fnil + 0) length)))))))}) (extend InputStream RabinHashable {:-hash-seq (fn [^InputStream this ctx] - (prn :stream) (-hash-seq (BufferedInputStream. this) ctx))}) (extend (class (make-array Byte/TYPE 0)) RabinHashable {:-hash-seq (fn [^bytes this ctx] - (prn :bytes) (-hash-seq (ByteArrayInputStream. this) ctx))}) (extend File RabinHashable {:-hash-seq (fn [^File this ctx] - (prn :file) (when (and (.exists this) (.isFile this)) (-hash-seq (io/input-stream this) ctx)))}) @@ -149,5 +147,4 @@ RabinHashable {:-hash-seq (fn [^String this ctx] - (prn :string) (-hash-seq (.getBytes this) ctx))}) From db828d7136f773f6c3901b42427f144afa179511 Mon Sep 17 00:00:00 2001 From: Darin Douglass Date: Wed, 24 Jul 2024 15:41:53 -0400 Subject: [PATCH 5/5] cleanup stuffs --- src/clj_rabin/fast.clj | 171 ++++++++++++++++++++++++++++++----------- 1 file changed, 127 insertions(+), 44 deletions(-) diff --git a/src/clj_rabin/fast.clj b/src/clj_rabin/fast.clj index 5358a74..2aedc8b 100644 --- a/src/clj_rabin/fast.clj +++ b/src/clj_rabin/fast.clj @@ -1,4 +1,4 @@ -3M(ns clj-rabin.fast +(ns clj-rabin.fast (:require [clojure.java.io :as io] [clojure.math :as math]) (:import [java.io BufferedInputStream ByteArrayInputStream ByteArrayOutputStream File InputStream] @@ -60,68 +60,134 @@ (defn drain [n src dest] (let [bytes (if (= 1 n) (.read src) (.readNBytes src n))] - (.write dest bytes) - bytes)) + (when-not (= -1 bytes) + (.write dest bytes) + bytes))) + +(defn maskset + [center level] + (let [bits (int (quot (math/log10 center) (math/log10 2))) + ;;_ (prn :avg center :bits bits :n level) + small-mask (masks (+ bits ^long level)) + large-mask (masks (- bits ^long level))] + {:small + {:gear small-mask + :shift (shift-once small-mask)} + + :large + {:gear large-mask + :shift (shift-once large-mask)}})) (defn- chunker - [^BufferedInputStream this {:size/keys [min average max] - :keys [pos normalization] + [^BufferedInputStream this {:size/keys [^long min ^long average ^long max] + :keys [pos ^long normalization] :or {pos 0 normalization 0 min 64 average 256 max 1024}}] (let [whats-left (.available this) - bits (int (quot (math/log10 average) (math/log10 2))) - masks {:small (masks (+ bits normalization)) - :large (masks (- bits normalization))}] + masks (maskset average normalization)] (if (<= whats-left min) - {:data (String. (.readNBytes this whats-left)) + {:data (.readNBytes this whats-left) :fingerprint 0 :offset pos :length whats-left} (with-open [baos (ByteArrayOutputStream.)] - (let [upper-bound (if (> whats-left max) max whats-left) - normal (if (< whats-left average) whats-left average) - normal-cutoff (quot normal 2) - upper-cutoff (int (quot upper-bound 2))] + (let [^long upper-bound (if (> whats-left max) max whats-left) + ^long normal (if (< whats-left average) whats-left average) + data-fn (fn [fingerprint length] + {:data (.toByteArray baos) + :length length + :fingerprint fingerprint + :offset pos})] + #_(prn :remainder whats-left + :min min :avg average :max max + :upper-bound upper-bound + :normal normal + :start (/ min 2) + :end (/ upper-bound 2)) (drain min this baos) - (reduce (fn [fingerprint i] - (let [mask (if (< i normal-cutoff) - (masks :small) - (masks :large)) - offset (* 2 i) - byte (drain 1 this baos) - ;; _ (prn :shift-stage offset byte fingerprint) - fingerprint (unchecked-add (shift-twice fingerprint) (get-in lookups [:shift byte]))] - (if (or (zero? (bit-and fingerprint mask)) - ;; if we've hit the end, return regardless - (= (dec upper-cutoff) i)) - (reduced - {:data (String. (.toByteArray baos)) - :fingerprint fingerprint - :offset pos - :length (inc offset)}) - (let [byte (drain 1 this baos) - fingerprint (unchecked-add fingerprint (get-in lookups [:gear byte]))] - (if (zero? (bit-and fingerprint mask)) - (reduced - {:data (String. (.toByteArray baos)) - :fingerprint fingerprint - :offset pos - :length (+ 2 offset)}) - fingerprint))))) - 0 - (range (quot min 2) (quot upper-bound 2)))))))) - - (extend BufferedInputStream + (loop [fingerprint 0 + length min + [type & rest] (cycle [:shift :gear])] + (if (= length upper-bound) + (data-fn fingerprint length) + (let [mask (if (< length ^long normal) + (masks :small) + (masks :large)) + byte (drain 1 this baos) + length (inc length) + fingerprint (cond-> fingerprint + (#{:shift} type) (shift-twice) + true (unchecked-add ^long (get-in lookups [type byte])))] + (if (zero? (bit-and fingerprint ^long (mask type))) + (data-fn fingerprint length) + (recur fingerprint length rest))))) + + #_(reduce + (fn [{:keys [fingerprint length] :as acc} i] + (prn :i i :l length :b upper-bound :r (.available this)) + (if (= length upper-bound) + (reduced (assoc acc :data (.toByteArray baos) :length length)) + (let [mask (if (< length ^long normal) + (masks :small) + (masks :large)) + byte (drain 1 this baos) + length (inc length) + fingerprint (unchecked-add (shift-twice fingerprint) ^long (get-in lookups [:shift byte])) + ;;_ (prn :shift-stage length byte fingerprint (mask :shift) (zero? (bit-and fingerprint ^long (mask :shift)))) + ] + (if (or (zero? (bit-and fingerprint ^long (mask :shift))) + (zero? (.available this))) + (reduced (assoc acc :data (.toByteArray baos) :fingerprint fingerprint :length length)) + (let [byte (drain 1 this baos) + length (inc length) + ;; _ (prn fingerprint byte) + fingerprint (unchecked-add fingerprint ^long (get-in lookups [:gear byte]))] + (if (zero? (bit-and fingerprint (mask :normal))) + (reduced (assoc acc :data (.toByteArray baos) :fingerprint fingerprint :length length)) + (assoc acc :fingerprint fingerprint :length length))))))) + {:fingerprint 0 :offset pos :length min} + (range)) + + #_(reduce (fn [fingerprint ^long i] + (let [^long mask (if (< i ^long normal-cutoff) + (masks :small) + (masks :large)) + offset (* 2 i) + byte (drain 1 this baos) + ;; _ (prn :shift-stage offset byte fingerprint) + fingerprint (unchecked-add (shift-twice fingerprint) ^long (get-in lookups [:shift byte]))] + (if (or (zero? (bit-and fingerprint mask)) + ;; if we've hit the end, return regardless + (>= (dec upper-cutoff) offset)) + (reduced + {:data (.toByteArray baos) + :fingerprint fingerprint + :offset pos + :length (inc offset)}) + (let [byte (drain 1 this baos) + ;;_ (prn fingerprint byte) + fingerprint (unchecked-add fingerprint ^long (get-in lookups [:gear byte]))] + (if (zero? (bit-and fingerprint mask)) + (reduced + {:data (.toByteArray baos) + :fingerprint fingerprint + :offset pos + :length (+ 2 offset)}) + fingerprint))))) + 0 + indices)))))) + +(extend BufferedInputStream RabinHashable {:-hash-seq (fn [^BufferedInputStream this {:keys [chunk-fn] :or {chunk-fn chunker} :as ctx}] (lazy-seq (when (pos? (.available this)) (when-let [{:keys [length] :as chunk} (chunk-fn this ctx)] - ;; (prn chunk ctx) + #_(prn chunk ctx) (cons chunk (-hash-seq this (update ctx :pos (fnil + 0) length)))))))}) (extend InputStream @@ -141,10 +207,27 @@ {:-hash-seq (fn [^File this ctx] (when (and (.exists this) (.isFile this)) - (-hash-seq (io/input-stream this) ctx)))}) + #_(prn :f this (.length this)) + (try + (doall (-hash-seq (io/input-stream this) ctx)) + (catch Exception e + (prn this e)))))}) (extend String RabinHashable {:-hash-seq (fn [^String this ctx] (-hash-seq (.getBytes this) ctx))}) + +(comment + "data/enron/maildir/arnold-j/deleted_items/397." + "data/enron/maildir/arnold-j/inbox/34." + "data/enron/maildir/arnold-j/discussion_threads/81." + "data/enron/maildir/arnold-j/vulcan_signs/2." + + "data/enron/maildir/arnold-j/notes_inbox/9." + + (def f (io/file "/Users/ddouglass/src/clj-rabin/data/enron/maildir/arnold-j/vulcan_signs/2.")) + (-hash-seq f {}) + + nil)