From 232e6d1a21c57c9fc825deeaa90c50594cf60c4c Mon Sep 17 00:00:00 2001 From: Paddy Xu Date: Wed, 17 Jul 2019 12:30:18 +0300 Subject: [PATCH] Implement DP prefix reduction --- .../helsinki/cs/udbms/AdaptivePrefixFilter.kt | 5 +- .../udbms/DynamicProgrammingPebbleReducer.kt | 103 ++++++++++++++++++ .../fi/helsinki/cs/udbms/FastPebbleReducer.kt | 26 +++-- .../kotlin/fi/helsinki/cs/udbms/MainJoin.kt | 50 ++++++--- .../helsinki/cs/udbms/SimilarityVerifier.kt | 6 +- .../helsinki/cs/udbms/struct/GlobalOrder.kt | 15 ++- .../fi/helsinki/cs/udbms/struct/Segment.kt | 1 + .../cs/udbms/struct/SegmentedString.kt | 7 +- .../cs/udbms/util/RuntimeParameters.kt | 10 +- 9 files changed, 183 insertions(+), 40 deletions(-) create mode 100644 src/main/kotlin/fi/helsinki/cs/udbms/DynamicProgrammingPebbleReducer.kt diff --git a/src/main/kotlin/fi/helsinki/cs/udbms/AdaptivePrefixFilter.kt b/src/main/kotlin/fi/helsinki/cs/udbms/AdaptivePrefixFilter.kt index e2a60a6..7fe8509 100644 --- a/src/main/kotlin/fi/helsinki/cs/udbms/AdaptivePrefixFilter.kt +++ b/src/main/kotlin/fi/helsinki/cs/udbms/AdaptivePrefixFilter.kt @@ -55,10 +55,13 @@ class AdaptivePrefixFilter(private val threshold: Double, private val overlap: I < threshold * max(string1.minPartitionSize, string2.minPartitionSize) ) return@nextString2 - // check if either segment1 or segment2 is used by any other pair + // check if either segment1 or segment2 is used by any other pebble pair val used = usedSegments.getOrPut(string2, { mutableSetOf() }) if (used.contains(segment1) xor used.contains(segment2)) return@nextString2 + // also check for conflict segments + if (used.intersect(segment1.conflictSegments).isNotEmpty() || used.intersect(segment2.conflictSegments).isNotEmpty()) + return@nextString2 // mark segment1 and segment2 as used for string2 used.add(segment1); used.add(segment2) diff --git a/src/main/kotlin/fi/helsinki/cs/udbms/DynamicProgrammingPebbleReducer.kt b/src/main/kotlin/fi/helsinki/cs/udbms/DynamicProgrammingPebbleReducer.kt new file mode 100644 index 0000000..d4642c0 --- /dev/null +++ b/src/main/kotlin/fi/helsinki/cs/udbms/DynamicProgrammingPebbleReducer.kt @@ -0,0 +1,103 @@ +// +// MIT License +// +// Copyright (c) 2019 pengxu +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// + +package fi.helsinki.cs.udbms + +import fi.helsinki.cs.udbms.struct.* + +class DynamicProgrammingPebbleReducer(threshold: Double, overlap: Int, order: GlobalOrder) : + PebbleReducer(threshold, overlap, order) { + override fun reduce(str: SegmentedString, pebbles: Iterable): List { + val bound = threshold * str.minPartitionSize + val pebblesSorted = pebbles.sortedBy { order.getOrder(it) } + + val store = mutableMapOf>() + var accSim = 0.0 + + var i = pebblesSorted.size - 1 + while (i > 0) { + val dp = Array(str.segments.size + 1) { Array(overlap) { 0.0 } } + val acc = Array(str.segments.size + 1) { Array(overlap) { 0.0 } } + + (1 until str.segments.size).forEach { p -> + // fill V[p,_] by Equation 13 + (1 until overlap).map { c -> + acc[p][c] = getR(pebblesSorted, str.segments[p], i, c) - getR(pebblesSorted, str.segments[p], i, 0) + } + + (1 until overlap).forEach { d -> + dp[p][d] = (0..d).map { c -> dp[p - 1][d - c] + acc[p][c] }.max() ?: 0.0 + if (accSim + dp[p][d] >= bound) + return pebblesSorted.slice(0..i) // early termination + } + } + + // refresh accSim + val it = pebblesSorted[i] + store.putIfAbsent(it.segment, Array((KnowledgeType.values().map { it.id }.max() ?: 0) + 1) { 0.0 }) + accSim -= store[it.segment]?.max() ?: 0.0 + + val old = store[it.segment]?.get(it.type.id) ?: 0.0 + store[it.segment]?.set(it.type.id, old + it.weight) + + accSim += store[it.segment]?.max() ?: 0.0 + + i-- + } + + return pebblesSorted.take(1) + } + + // Equation 14 + private fun getR( + pebblesSorted: List, + p: Segment, + i: Int, + c: Int + ): Double { + return KnowledgeType.values().map { f -> + val r1 = pebblesSorted + .slice(i until pebblesSorted.size) + .asSequence() + .filter { it.segment == p } + .filter { it.type == f } + .map { it.weight } + .sum() + + if (c == 0) return@map r1 + + val r2 = pebblesSorted + .slice(0 until i) + .asSequence() + .filter { it.segment == p } + .filter { it.type == f } + .map { it.weight } + .sortedDescending() + .take(c) + .sum() + + return@map r1 + r2 + }.max() ?: 0.0 + } +} \ No newline at end of file diff --git a/src/main/kotlin/fi/helsinki/cs/udbms/FastPebbleReducer.kt b/src/main/kotlin/fi/helsinki/cs/udbms/FastPebbleReducer.kt index 97b4e7a..658fe63 100644 --- a/src/main/kotlin/fi/helsinki/cs/udbms/FastPebbleReducer.kt +++ b/src/main/kotlin/fi/helsinki/cs/udbms/FastPebbleReducer.kt @@ -30,32 +30,34 @@ class FastPebbleReducer(threshold: Double, overlap: Int, order: GlobalOrder) : PebbleReducer(threshold, overlap, order) { override fun reduce(str: SegmentedString, pebbles: Iterable): List { val bound = threshold * str.minPartitionSize - val pebblesSorted = pebbles.sortedByDescending { order.getOrder(it) }.toMutableList() + val pebbleRemaining = pebbles.sortedByDescending { order.getOrder(it) }.toMutableList() val store = mutableMapOf>() - var removedSim = 0.0 + var accSim = 0.0 - while (pebblesSorted.size > 1) { - val it = pebblesSorted.first() - pebblesSorted.removeAt(0) // delete the first pebble + while (pebbleRemaining.size > 1) { + val it = pebbleRemaining.first() + pebbleRemaining.removeAt(0) // delete the first pebble - store.putIfAbsent(it.segment, Array(KnowledgeType.values().size) { 0.0 }) + store.putIfAbsent(it.segment, Array((KnowledgeType.values().map { it.id }.max() ?: 0) + 1) { 0.0 }) - removedSim -= store[it.segment]?.max() ?: 0.0 + accSim -= store[it.segment]?.max() ?: 0.0 val old = store[it.segment]?.get(it.type.id) ?: 0.0 store[it.segment]?.set(it.type.id, old + it.weight) - removedSim += store[it.segment]?.max() ?: 0.0 + accSim += store[it.segment]?.max() ?: 0.0 - val futureSim = pebblesSorted.take(overlap - 1).sumByDouble { it.weight } // [overlap-1] heaviest pebbles + // [overlap-1] heaviest pebbles + val futureSim = pebbleRemaining.map { it.weight }.sortedDescending().take(overlap - 1).sum() - if (removedSim + futureSim >= bound) { // if too much, add it back and stop - pebblesSorted.add(0, it) + if (accSim + futureSim >= bound) { // if too much, add it back and stop + pebbleRemaining.add(0, it) break } + if (pebbleRemaining.isEmpty()) return listOf(it) } - return pebblesSorted.toList() + return pebbleRemaining.toList() } } \ No newline at end of file diff --git a/src/main/kotlin/fi/helsinki/cs/udbms/MainJoin.kt b/src/main/kotlin/fi/helsinki/cs/udbms/MainJoin.kt index 05b4903..e16f555 100644 --- a/src/main/kotlin/fi/helsinki/cs/udbms/MainJoin.kt +++ b/src/main/kotlin/fi/helsinki/cs/udbms/MainJoin.kt @@ -70,9 +70,16 @@ fun main(args: Array) = mainBody { order.addAll(pebbles1.values.flatten()) order.addAll(pebbles2.values.flatten()) + /*=================================================================*/ + + val reducer = when (params.filter) { + "Fast" -> FastPebbleReducer(params.threshold, params.overlap, order) + "DP" -> DynamicProgrammingPebbleReducer(params.threshold, params.overlap, order) + else -> throw Exception("Invalid filtering method: ${params.filter}") + } + print("Selecting prefixes... ") val time = measureTimeMillis { - val reducer = FastPebbleReducer(params.threshold, params.overlap, order) signatures1 = list1.mapParallel { Pair(it, reducer.reduce(it, pebbles1[it] ?: emptyList())) }.toMap() signatures2 = list2.mapParallel { Pair(it, reducer.reduce(it, pebbles2[it] ?: emptyList())) }.toMap() } @@ -82,15 +89,12 @@ fun main(args: Array) = mainBody { /*=================================================================*/ println("Building inverted list... ") - //val index1 = InvertedIndex() - //signatures1.map { str -> str.value.map { p -> index1.add(p, p.segment) } } - val index2 = InvertedIndex() signatures2.map { str -> str.value.map { p -> index2.add(p, p.segment) } } /*=================================================================*/ - print("Filtering on ${if (params.singleThread) "a single thread" else "multiple threads"}... ") + print("Filtering using ${params.filter} on ${if (params.singleThread) "a single thread" else "multiple threads"}... ") var candidates: List = emptyList() var time = measureTimeMillis { candidates = AdaptivePrefixFilter(params.threshold, params.overlap).getCandidates(signatures1, index2) @@ -100,9 +104,10 @@ fun main(args: Array) = mainBody { /*=================================================================*/ val verifier = when (params.verify) { + "Greedy" -> GreedySimilarityVerifier(params.threshold, syn, tax, params.gram) "SquareImp" -> SquareImpSimilarityVerifier(params.threshold, syn, tax, params.gram) "SquareImp-Improved" -> SquareImpSimilarityVerifier(params.threshold, syn, tax, params.gram, true) - else -> GreedySimilarityVerifier(params.threshold, syn, tax, params.gram) + else -> throw Exception("Invalid verification method: ${params.verify}") } print("Verifying using ${params.verify} on ${if (params.singleThread) "a single thread" else "multiple threads"}... ") @@ -110,32 +115,41 @@ fun main(args: Array) = mainBody { time = measureTimeMillis { results = candidates.mapParallelOrSequential { Pair(it, verifier.getSimilarity(it.first, it.second)) } - .filter { it.second.endInclusive >= params.threshold } + .filter { it.second.start >= params.threshold } .toList() } println("${results.size} results obtained in $time ms") /*=================================================================*/ - val bw: BufferedWriter? = if (params.output.isNotEmpty()) File(params.output).bufferedWriter() else null - if (bw != null) { - println("Writing results to ${params.output}... ") - bw.write("string_1,string_2,sim_min,sim_max") - bw.newLine() - } else println() + val bw: BufferedWriter? = when (params.output) { + "null" -> null + "stdout" -> System.out.bufferedWriter() + else -> { + print("Writing results to ${params.output}... ") + val w = File(params.output).bufferedWriter() + w.write("string_1,string_2,sim_min,sim_max") + w.newLine() + w + } + } + println() results.sortedBy { it.first.second.id }.sortedBy { it.first.first.id }.withIndex().forEach { val str = it.value.first val sim = it.value.second - if (bw == null) { - println( + when (params.output) { + "null" -> { + } + "stdout" -> println( " ${it.index}: " + "(${str.first.id}, ${str.second.id}) has similarity " + "[${sim.start.format(3)}, ${sim.endInclusive.format(3)}]" ) - } else { - bw.write("${str.first.id},${str.second.id},${sim.start},${sim.endInclusive}") - bw.newLine() + else -> { + bw!!.write("${str.first.id},${str.second.id},${sim.start},${sim.endInclusive}") + bw.newLine() + } } } diff --git a/src/main/kotlin/fi/helsinki/cs/udbms/SimilarityVerifier.kt b/src/main/kotlin/fi/helsinki/cs/udbms/SimilarityVerifier.kt index 6606861..9cf7669 100644 --- a/src/main/kotlin/fi/helsinki/cs/udbms/SimilarityVerifier.kt +++ b/src/main/kotlin/fi/helsinki/cs/udbms/SimilarityVerifier.kt @@ -159,8 +159,10 @@ abstract class SimilarityVerifier( val conflicts = vertices .filterNot { it == centre } .filter { - it.relation.seg1.wordIds.intersect(centre.relation.seg1.wordIds).isNotEmpty() - || it.relation.seg2.wordIds.intersect(centre.relation.seg2.wordIds).isNotEmpty() + it.relation.seg1 == centre.relation.seg1 + || it.relation.seg2 == centre.relation.seg2 + || it.relation.seg1.conflictSegments.contains(centre.relation.seg1) + || it.relation.seg2.conflictSegments.contains(centre.relation.seg2) } centre.neighbours.addAll(conflicts) k = max(k, conflicts.count()) diff --git a/src/main/kotlin/fi/helsinki/cs/udbms/struct/GlobalOrder.kt b/src/main/kotlin/fi/helsinki/cs/udbms/struct/GlobalOrder.kt index c7c3adf..53ce881 100644 --- a/src/main/kotlin/fi/helsinki/cs/udbms/struct/GlobalOrder.kt +++ b/src/main/kotlin/fi/helsinki/cs/udbms/struct/GlobalOrder.kt @@ -25,11 +25,20 @@ package fi.helsinki.cs.udbms.struct class GlobalOrder { - private var order = mutableMapOf() + private var frequency = mutableMapOf() + private var order = mapOf() fun addAll(pebbles: Iterable) { - order.putAll(pebbles.groupingBy { it }.eachCount()) + pebbles.groupingBy { it }.eachCount().forEach { (it, count) -> + val freq = frequency.getOrDefault(it, 0) + frequency[it] = freq + count + } + + // re-sort all pebbles + order = frequency.keys.sortedBy { frequency[it] }.withIndex().associate { Pair(it.value, it.index) } } - fun getOrder(p: Pebble) = order[p] ?: Int.MIN_VALUE + fun getOrder(p: Pebble): Int { + return order[p] ?: Int.MIN_VALUE + } } \ No newline at end of file diff --git a/src/main/kotlin/fi/helsinki/cs/udbms/struct/Segment.kt b/src/main/kotlin/fi/helsinki/cs/udbms/struct/Segment.kt index ae393f1..290f689 100644 --- a/src/main/kotlin/fi/helsinki/cs/udbms/struct/Segment.kt +++ b/src/main/kotlin/fi/helsinki/cs/udbms/struct/Segment.kt @@ -26,6 +26,7 @@ package fi.helsinki.cs.udbms.struct class Segment(label: String) { var segmentedString: SegmentedString? = null + var conflictSegments = emptySet() val label = label.split(' ').joinToString(separator = " ") { it.substring(it.indexOf(':') + 1) } val wordIds = label.split(' ').map { it.substring(0, it.indexOf(':')).toInt() } val numberOfWords = wordIds.size diff --git a/src/main/kotlin/fi/helsinki/cs/udbms/struct/SegmentedString.kt b/src/main/kotlin/fi/helsinki/cs/udbms/struct/SegmentedString.kt index c33f038..d5ff0c0 100644 --- a/src/main/kotlin/fi/helsinki/cs/udbms/struct/SegmentedString.kt +++ b/src/main/kotlin/fi/helsinki/cs/udbms/struct/SegmentedString.kt @@ -35,12 +35,15 @@ class SegmentedString(val id: Int, val segments: List) { constructor(id: Int, segments: List, dummy: Unit) : this(id, segments.map { Segment(it) }) { this.segments.forEach { it.segmentedString = this } + this.segments.forEach { + it.conflictSegments = this.segments + .filterNot { other -> other == it } + .filter { other -> other.wordIds.intersect(it.wordIds).isNotEmpty() }.toSet() + } } fun unionSegments(): String = segments.joinToString(separator = ";") - private fun unionAllSegments(): String = segments.joinToString(separator = ";") - override fun toString() = "[$id] $numberOfTokens tokens" private fun calculateMinPartitionSize(): Int { diff --git a/src/main/kotlin/fi/helsinki/cs/udbms/util/RuntimeParameters.kt b/src/main/kotlin/fi/helsinki/cs/udbms/util/RuntimeParameters.kt index 700e8d7..641df19 100644 --- a/src/main/kotlin/fi/helsinki/cs/udbms/util/RuntimeParameters.kt +++ b/src/main/kotlin/fi/helsinki/cs/udbms/util/RuntimeParameters.kt @@ -79,6 +79,12 @@ class RuntimeParameters(parser: ArgParser) { if (value < 1) throw InvalidArgumentException("Number of common signatures must be at least 1") } + val filter by parser.mapping( + "--filter-fast" to "Fast", + "--filter-dp" to "DP", + help = "Specify the filtering method: Fast (Heuristic) and DP (Dynamic Programming) (default: --filter-fast)" + ).default { "Fast" } + val verify by parser.mapping( "--verify-greedy" to "Greedy", "--verify-squareimp" to "SquareImp", @@ -93,8 +99,8 @@ class RuntimeParameters(parser: ArgParser) { val output by parser.storing( "-o", "--output", - help = "name of file for writing join results (default: to stdout)" - ).default("") + help = "method for handling join results: null (no output), stdout (to standard output), or a filename (output as csv) (default: -o null)" + ).default("null") val threshold by parser.positional( "THRESHOLD",