Skip to content

Commit

Permalink
Implement DP prefix reduction
Browse files Browse the repository at this point in the history
  • Loading branch information
xupefei committed Jul 17, 2019
1 parent 80aae9c commit 232e6d1
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 40 deletions.
5 changes: 4 additions & 1 deletion src/main/kotlin/fi/helsinki/cs/udbms/AdaptivePrefixFilter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,13 @@ class AdaptivePrefixFilter(private val threshold: Double, private val overlap: I
< threshold * max(string1.minPartitionSize, string2.minPartitionSize)
) return@nextString2

// check if either segment1 or segment2 is used by any other pair
// check if either segment1 or segment2 is used by any other pebble pair
val used = usedSegments.getOrPut(string2, { mutableSetOf() })
if (used.contains(segment1) xor used.contains(segment2))
return@nextString2
// also check for conflict segments
if (used.intersect(segment1.conflictSegments).isNotEmpty() || used.intersect(segment2.conflictSegments).isNotEmpty())
return@nextString2

// mark segment1 and segment2 as used for string2
used.add(segment1); used.add(segment2)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
//
// MIT License
//
// Copyright (c) 2019 pengxu
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
//

package fi.helsinki.cs.udbms

import fi.helsinki.cs.udbms.struct.*

class DynamicProgrammingPebbleReducer(threshold: Double, overlap: Int, order: GlobalOrder) :
PebbleReducer(threshold, overlap, order) {
override fun reduce(str: SegmentedString, pebbles: Iterable<Pebble>): List<Pebble> {
val bound = threshold * str.minPartitionSize
val pebblesSorted = pebbles.sortedBy { order.getOrder(it) }

val store = mutableMapOf<Segment, Array<Double>>()
var accSim = 0.0

var i = pebblesSorted.size - 1
while (i > 0) {
val dp = Array(str.segments.size + 1) { Array(overlap) { 0.0 } }
val acc = Array(str.segments.size + 1) { Array(overlap) { 0.0 } }

(1 until str.segments.size).forEach { p ->
// fill V[p,_] by Equation 13
(1 until overlap).map { c ->
acc[p][c] = getR(pebblesSorted, str.segments[p], i, c) - getR(pebblesSorted, str.segments[p], i, 0)
}

(1 until overlap).forEach { d ->
dp[p][d] = (0..d).map { c -> dp[p - 1][d - c] + acc[p][c] }.max() ?: 0.0
if (accSim + dp[p][d] >= bound)
return pebblesSorted.slice(0..i) // early termination
}
}

// refresh accSim
val it = pebblesSorted[i]
store.putIfAbsent(it.segment, Array((KnowledgeType.values().map { it.id }.max() ?: 0) + 1) { 0.0 })
accSim -= store[it.segment]?.max() ?: 0.0

val old = store[it.segment]?.get(it.type.id) ?: 0.0
store[it.segment]?.set(it.type.id, old + it.weight)

accSim += store[it.segment]?.max() ?: 0.0

i--
}

return pebblesSorted.take(1)
}

// Equation 14
private fun getR(
pebblesSorted: List<Pebble>,
p: Segment,
i: Int,
c: Int
): Double {
return KnowledgeType.values().map { f ->
val r1 = pebblesSorted
.slice(i until pebblesSorted.size)
.asSequence()
.filter { it.segment == p }
.filter { it.type == f }
.map { it.weight }
.sum()

if (c == 0) return@map r1

val r2 = pebblesSorted
.slice(0 until i)
.asSequence()
.filter { it.segment == p }
.filter { it.type == f }
.map { it.weight }
.sortedDescending()
.take(c)
.sum()

return@map r1 + r2
}.max() ?: 0.0
}
}
26 changes: 14 additions & 12 deletions src/main/kotlin/fi/helsinki/cs/udbms/FastPebbleReducer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,34 @@ class FastPebbleReducer(threshold: Double, overlap: Int, order: GlobalOrder) :
PebbleReducer(threshold, overlap, order) {
override fun reduce(str: SegmentedString, pebbles: Iterable<Pebble>): List<Pebble> {
val bound = threshold * str.minPartitionSize
val pebblesSorted = pebbles.sortedByDescending { order.getOrder(it) }.toMutableList()
val pebbleRemaining = pebbles.sortedByDescending { order.getOrder(it) }.toMutableList()

val store = mutableMapOf<Segment, Array<Double>>()
var removedSim = 0.0
var accSim = 0.0

while (pebblesSorted.size > 1) {
val it = pebblesSorted.first()
pebblesSorted.removeAt(0) // delete the first pebble
while (pebbleRemaining.size > 1) {
val it = pebbleRemaining.first()
pebbleRemaining.removeAt(0) // delete the first pebble

store.putIfAbsent(it.segment, Array(KnowledgeType.values().size) { 0.0 })
store.putIfAbsent(it.segment, Array((KnowledgeType.values().map { it.id }.max() ?: 0) + 1) { 0.0 })

removedSim -= store[it.segment]?.max() ?: 0.0
accSim -= store[it.segment]?.max() ?: 0.0

val old = store[it.segment]?.get(it.type.id) ?: 0.0
store[it.segment]?.set(it.type.id, old + it.weight)

removedSim += store[it.segment]?.max() ?: 0.0
accSim += store[it.segment]?.max() ?: 0.0

val futureSim = pebblesSorted.take(overlap - 1).sumByDouble { it.weight } // [overlap-1] heaviest pebbles
// [overlap-1] heaviest pebbles
val futureSim = pebbleRemaining.map { it.weight }.sortedDescending().take(overlap - 1).sum()

if (removedSim + futureSim >= bound) { // if too much, add it back and stop
pebblesSorted.add(0, it)
if (accSim + futureSim >= bound) { // if too much, add it back and stop
pebbleRemaining.add(0, it)
break
}
if (pebbleRemaining.isEmpty()) return listOf(it)
}

return pebblesSorted.toList()
return pebbleRemaining.toList()
}
}
50 changes: 32 additions & 18 deletions src/main/kotlin/fi/helsinki/cs/udbms/MainJoin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,16 @@ fun main(args: Array<String>) = mainBody {
order.addAll(pebbles1.values.flatten())
order.addAll(pebbles2.values.flatten())

/*=================================================================*/

val reducer = when (params.filter) {
"Fast" -> FastPebbleReducer(params.threshold, params.overlap, order)
"DP" -> DynamicProgrammingPebbleReducer(params.threshold, params.overlap, order)
else -> throw Exception("Invalid filtering method: ${params.filter}")
}

print("Selecting prefixes... ")
val time = measureTimeMillis {
val reducer = FastPebbleReducer(params.threshold, params.overlap, order)
signatures1 = list1.mapParallel { Pair(it, reducer.reduce(it, pebbles1[it] ?: emptyList())) }.toMap()
signatures2 = list2.mapParallel { Pair(it, reducer.reduce(it, pebbles2[it] ?: emptyList())) }.toMap()
}
Expand All @@ -82,15 +89,12 @@ fun main(args: Array<String>) = mainBody {
/*=================================================================*/

println("Building inverted list... ")
//val index1 = InvertedIndex()
//signatures1.map { str -> str.value.map { p -> index1.add(p, p.segment) } }

val index2 = InvertedIndex()
signatures2.map { str -> str.value.map { p -> index2.add(p, p.segment) } }

/*=================================================================*/

print("Filtering on ${if (params.singleThread) "a single thread" else "multiple threads"}... ")
print("Filtering using ${params.filter} on ${if (params.singleThread) "a single thread" else "multiple threads"}... ")
var candidates: List<SegmentedStringPair> = emptyList()
var time = measureTimeMillis {
candidates = AdaptivePrefixFilter(params.threshold, params.overlap).getCandidates(signatures1, index2)
Expand All @@ -100,42 +104,52 @@ fun main(args: Array<String>) = mainBody {
/*=================================================================*/

val verifier = when (params.verify) {
"Greedy" -> GreedySimilarityVerifier(params.threshold, syn, tax, params.gram)
"SquareImp" -> SquareImpSimilarityVerifier(params.threshold, syn, tax, params.gram)
"SquareImp-Improved" -> SquareImpSimilarityVerifier(params.threshold, syn, tax, params.gram, true)
else -> GreedySimilarityVerifier(params.threshold, syn, tax, params.gram)
else -> throw Exception("Invalid verification method: ${params.verify}")
}

print("Verifying using ${params.verify} on ${if (params.singleThread) "a single thread" else "multiple threads"}... ")
var results: List<Pair<SegmentedStringPair, ClosedRange<Double>>> = emptyList()
time = measureTimeMillis {
results =
candidates.mapParallelOrSequential { Pair(it, verifier.getSimilarity(it.first, it.second)) }
.filter { it.second.endInclusive >= params.threshold }
.filter { it.second.start >= params.threshold }
.toList()
}
println("${results.size} results obtained in $time ms")

/*=================================================================*/

val bw: BufferedWriter? = if (params.output.isNotEmpty()) File(params.output).bufferedWriter() else null
if (bw != null) {
println("Writing results to ${params.output}... ")
bw.write("string_1,string_2,sim_min,sim_max")
bw.newLine()
} else println()
val bw: BufferedWriter? = when (params.output) {
"null" -> null
"stdout" -> System.out.bufferedWriter()
else -> {
print("Writing results to ${params.output}... ")
val w = File(params.output).bufferedWriter()
w.write("string_1,string_2,sim_min,sim_max")
w.newLine()
w
}
}
println()

results.sortedBy { it.first.second.id }.sortedBy { it.first.first.id }.withIndex().forEach {
val str = it.value.first
val sim = it.value.second
if (bw == null) {
println(
when (params.output) {
"null" -> {
}
"stdout" -> println(
" ${it.index}: "
+ "(${str.first.id}, ${str.second.id}) has similarity "
+ "[${sim.start.format(3)}, ${sim.endInclusive.format(3)}]"
)
} else {
bw.write("${str.first.id},${str.second.id},${sim.start},${sim.endInclusive}")
bw.newLine()
else -> {
bw!!.write("${str.first.id},${str.second.id},${sim.start},${sim.endInclusive}")
bw.newLine()
}
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/main/kotlin/fi/helsinki/cs/udbms/SimilarityVerifier.kt
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,10 @@ abstract class SimilarityVerifier(
val conflicts = vertices
.filterNot { it == centre }
.filter {
it.relation.seg1.wordIds.intersect(centre.relation.seg1.wordIds).isNotEmpty()
|| it.relation.seg2.wordIds.intersect(centre.relation.seg2.wordIds).isNotEmpty()
it.relation.seg1 == centre.relation.seg1
|| it.relation.seg2 == centre.relation.seg2
|| it.relation.seg1.conflictSegments.contains(centre.relation.seg1)
|| it.relation.seg2.conflictSegments.contains(centre.relation.seg2)
}
centre.neighbours.addAll(conflicts)
k = max(k, conflicts.count())
Expand Down
15 changes: 12 additions & 3 deletions src/main/kotlin/fi/helsinki/cs/udbms/struct/GlobalOrder.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,20 @@
package fi.helsinki.cs.udbms.struct

class GlobalOrder {
private var order = mutableMapOf<Pebble, Int>()
private var frequency = mutableMapOf<Pebble, Int>()
private var order = mapOf<Pebble, Int>()

fun addAll(pebbles: Iterable<Pebble>) {
order.putAll(pebbles.groupingBy { it }.eachCount())
pebbles.groupingBy { it }.eachCount().forEach { (it, count) ->
val freq = frequency.getOrDefault(it, 0)
frequency[it] = freq + count
}

// re-sort all pebbles
order = frequency.keys.sortedBy { frequency[it] }.withIndex().associate { Pair(it.value, it.index) }
}

fun getOrder(p: Pebble) = order[p] ?: Int.MIN_VALUE
fun getOrder(p: Pebble): Int {
return order[p] ?: Int.MIN_VALUE
}
}
1 change: 1 addition & 0 deletions src/main/kotlin/fi/helsinki/cs/udbms/struct/Segment.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package fi.helsinki.cs.udbms.struct

class Segment(label: String) {
var segmentedString: SegmentedString? = null
var conflictSegments = emptySet<Segment>()
val label = label.split(' ').joinToString(separator = " ") { it.substring(it.indexOf(':') + 1) }
val wordIds = label.split(' ').map { it.substring(0, it.indexOf(':')).toInt() }
val numberOfWords = wordIds.size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ class SegmentedString(val id: Int, val segments: List<Segment>) {
constructor(id: Int, segments: List<String>, dummy: Unit)
: this(id, segments.map { Segment(it) }) {
this.segments.forEach { it.segmentedString = this }
this.segments.forEach {
it.conflictSegments = this.segments
.filterNot { other -> other == it }
.filter { other -> other.wordIds.intersect(it.wordIds).isNotEmpty() }.toSet()
}
}

fun unionSegments(): String = segments.joinToString(separator = ";")

private fun unionAllSegments(): String = segments.joinToString(separator = ";")

override fun toString() = "[$id] $numberOfTokens tokens"

private fun calculateMinPartitionSize(): Int {
Expand Down
10 changes: 8 additions & 2 deletions src/main/kotlin/fi/helsinki/cs/udbms/util/RuntimeParameters.kt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ class RuntimeParameters(parser: ArgParser) {
if (value < 1) throw InvalidArgumentException("Number of common signatures must be at least 1")
}

val filter by parser.mapping(
"--filter-fast" to "Fast",
"--filter-dp" to "DP",
help = "Specify the filtering method: Fast (Heuristic) and DP (Dynamic Programming) (default: --filter-fast)"
).default { "Fast" }

val verify by parser.mapping(
"--verify-greedy" to "Greedy",
"--verify-squareimp" to "SquareImp",
Expand All @@ -93,8 +99,8 @@ class RuntimeParameters(parser: ArgParser) {

val output by parser.storing(
"-o", "--output",
help = "name of file for writing join results (default: to stdout)"
).default("")
help = "method for handling join results: null (no output), stdout (to standard output), or a filename (output as csv) (default: -o null)"
).default("null")

val threshold by parser.positional(
"THRESHOLD",
Expand Down

0 comments on commit 232e6d1

Please sign in to comment.