Skip to content

Commit

Permalink
Implement estimation algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
xupefei committed Jul 18, 2019
1 parent 232e6d1 commit 8c8bad3
Show file tree
Hide file tree
Showing 7 changed files with 450 additions and 19 deletions.
1 change: 1 addition & 0 deletions .idea/artifacts/AU_Join_jar.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions src/main/kotlin/fi/helsinki/cs/udbms/AdaptivePrefixFilter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ package fi.helsinki.cs.udbms
import fi.helsinki.cs.udbms.struct.*
import fi.helsinki.cs.udbms.util.forEachParallelOrSequential
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import kotlin.math.max
import kotlin.math.min

class AdaptivePrefixFilter(private val threshold: Double, private val overlap: Int) {

fun getCandidates(signature1: Map<SegmentedString, List<Pebble>>, index2: InvertedIndex)
: List<SegmentedStringPair> {
: Pair<List<SegmentedStringPair>, Int> {
val candidatePairs = ConcurrentHashMap<SegmentedStringPair, Unit>()
val numberOfPairs = AtomicInteger()

signature1.entries.forEachParallelOrSequential { (string1, keys1) ->
val usedSegments = mutableMapOf<SegmentedString, MutableSet<Segment>>()
Expand All @@ -45,6 +47,8 @@ class AdaptivePrefixFilter(private val threshold: Double, private val overlap: I
val segment1 = key1.segment
val list2 = index2.getList(key1) ?: return@nextKey

numberOfPairs.getAndIncrement()

list2.forEach nextString2@{ segment2 ->
val string2 = segment2.segmentedString ?: return@nextString2

Expand Down Expand Up @@ -76,6 +80,6 @@ class AdaptivePrefixFilter(private val threshold: Double, private val overlap: I
}
}

return candidatePairs.keys.toList()
return Pair(candidatePairs.keys.toList(), numberOfPairs.toInt())
}
}
277 changes: 277 additions & 0 deletions src/main/kotlin/fi/helsinki/cs/udbms/MainEstimation.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
//
// MIT License
//
// Copyright (c) 2019 pengxu
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
//

package fi.helsinki.cs.udbms

import com.xenomachina.argparser.mainBody
import fi.helsinki.cs.udbms.struct.*
import fi.helsinki.cs.udbms.util.*
import java.util.concurrent.ThreadLocalRandom
import kotlin.math.pow
import kotlin.system.measureTimeMillis

fun main(args: Array<String>): Unit = mainBody {
val params = EstimationParameters.initialise(args)
Dispatcher.initialise(params.singleThread)

/*=================================================================*/

print("Reading string... ")
val list1 = IO.readSegmentedStrings(params.list1)
val list2 = IO.readSegmentedStrings(params.list2)
println("${list1.size} + ${list2.size} strings loaded")

var syn: SynonymKnowledge? = null
if (params.synonym.isNotEmpty()) {
print("Reading synonym... ")
syn = IO.readSynonym(params.synonym)
println("${syn.knowledge.size} rules loaded")
}
var tax: TaxonomyKnowledge? = null
if (params.taxonomy.isNotEmpty()) {
print("Reading taxonomy... ")
tax = IO.readTaxonomy(params.taxonomy)
println("${tax.knowledge.size} nodes loaded")
}

/*=================================================================*/

val pebbles1 = list1.mapParallel { Pair(it, PebbleGenerator(syn, tax, params.gram).generate(it)) }.toMap()
val pebbles2 = list2.mapParallel { Pair(it, PebbleGenerator(syn, tax, params.gram).generate(it)) }.toMap()

val order = GlobalOrder()
order.addAll(pebbles1.values.flatten())
order.addAll(pebbles2.values.flatten())

/*=================================================================*/

print("Running test drive... ")
var verified = false
val (filterTime, verifyTime) = params.overlapList.map {
val result = testDrive(
params, list1.shuffled().take(2000), list2.shuffled().take(2000),
pebbles1, pebbles2, order, syn, tax, it, verified
)
verified = true
return@map Pair(result.filterTime, result.verifyTime)
}.reduceIndexed { i, acc, now ->
Pair(
acc.first + (now.first - acc.first) / (i + 1),
if (now.second == 0.0) acc.second else acc.second + (acc.second - acc.second) / (i + 1)
)
}
println("filtering time ${filterTime.format(4)} ms/pair, verification time ${verifyTime.format(4)} ms/candidate")

/*=================================================================*/

print("\t")
print("i\t")
params.overlapList.forEach { print("mean$it\t") }
params.overlapList.forEach { print("error$it\t") }
println()

/*=================================================================*/

val p1 = params.sampleSize.toDouble() / list1.size
val p2 = params.sampleSize.toDouble() / list2.size

var lastEstimations = emptyMap<Int, Estimation>()

var iteration = 0
while (++iteration <= params.iteration) {
val sample1 = getBernoulliSample(list1, p1)
val sample2 = getBernoulliSample(list2, p2)

val estimations = params.overlapList.map { overlap ->
val result = testDrive(params, sample1, sample2, pebbles1, pebbles2, order, syn, tax, overlap, true)

val scaledCost =
result.numberOfPairs / (p1 * p2) * filterTime + result.numberOfCandidates / (p1 * p2) * verifyTime
val mean = movingMean(scaledCost, iteration, lastEstimations[overlap])
val variance = movingVariance(scaledCost, iteration, lastEstimations[overlap])
val error = params.quantile * (variance / iteration).pow(0.5)

return@map Estimation(
iteration, overlap, result.numberOfPairs, result.numberOfCandidates,
scaledCost, mean, variance, error
)
}.associateBy { it.overlap }

print("\t")
print("$iteration\t")
estimations.forEach { print("${it.value.meanOfScaledCost.format(0)}\t") }
estimations.forEach { print("${it.value.errorOfScaledCost.format(0)}\t") }
println()

lastEstimations = estimations

// should we stop here?
if (shouldStop(iteration, estimations))
break
}

print("Overlap parameters from the best to the worst: ")
print(lastEstimations.values.sortedBy { it.scaledCost }.map { it.overlap }.joinToString())
println()

Dispatcher.shutdown()
return@mainBody
}

private fun shouldStop(iteration: Int, estimations: Map<Int, Estimation>): Boolean {
if (estimations.size == 1) return true
if (iteration < 5) return false

val estimationsSorted = estimations.values.sortedBy { it.scaledCost }

return estimationsSorted.first().getMaxScaledCost() < estimationsSorted.drop(1).first().getMinScaledCost()
}

private fun <T> getBernoulliSample(data: Iterable<T>, p: Double): List<T> {
val rand = ThreadLocalRandom.current()
@Suppress("UNCHECKED_CAST")
return data.mapParallel { if (rand.nextDouble(1.0) < p) it else Unit }.filterNot { it == Unit } as List<T>
}

// region Estimation

private fun movingMean(estimate: Double, iteration: Int, last: Estimation?): Double {
val lastSafe = last ?: Estimation()

if (iteration == 1)
return estimate

return lastSafe.meanOfScaledCost + (estimate - lastSafe.meanOfScaledCost) / iteration
}

private fun movingVariance(estimate: Double, iteration: Int, last: Estimation?): Double {
val lastSafe = last ?: Estimation()

if (iteration == 1)
return 0.0

return (iteration - 2).toDouble() / (iteration - 1).toDouble() * lastSafe.varianceOfScaledCost +
((estimate - lastSafe.meanOfScaledCost).pow(2) / iteration)
}

private data class Estimation(
val iteration: Int = 0,
val overlap: Int = 0,
val numberOfPairs: Int = 0,
val numberOfCandidates: Int = 0,
val scaledCost: Double = 0.0,
val meanOfScaledCost: Double = 0.0,
val varianceOfScaledCost: Double = 0.0,
val errorOfScaledCost: Double = 0.0
) {
fun getMinScaledCost() = scaledCost - errorOfScaledCost
fun getMaxScaledCost() = scaledCost + errorOfScaledCost
}

// endregion

// region TestDrive

private fun testDrive(
params: EstimationParameters,
list1: List<SegmentedString>,
list2: List<SegmentedString>,
pebbles1: Map<SegmentedString, List<Pebble>>,
pebbles2: Map<SegmentedString, List<Pebble>>,
order: GlobalOrder,
syn: SynonymKnowledge?,
tax: TaxonomyKnowledge?,
overlap: Int,
skipVerify: Boolean
): TestDriveResult {
System.gc(); System.runFinalization()

/*=================================================================*/

var signatures1: Map<SegmentedString, List<Pebble>> = emptyMap()
var signatures2: Map<SegmentedString, List<Pebble>> = emptyMap()

run {
val reducer = when (params.filter) {
"Fast" -> FastPebbleReducer(params.threshold, overlap, order)
"DP" -> DynamicProgrammingPebbleReducer(params.threshold, overlap, order)
else -> throw Exception("Invalid filtering method: ${params.filter}")
}

signatures1 = list1.mapParallel { Pair(it, reducer.reduce(it, pebbles1[it] ?: emptyList())) }.toMap()
signatures2 = list2.mapParallel { Pair(it, reducer.reduce(it, pebbles2[it] ?: emptyList())) }.toMap()
}.run { System.gc(); System.runFinalization(); }

/*=================================================================*/

val index2 = InvertedIndex()
signatures2.map { str -> str.value.map { p -> index2.add(p, p.segment) } }

/*=================================================================*/

var candidates: List<SegmentedStringPair> = emptyList()
var numberOfPairs = 0
val filterTimeTotal = measureTimeMillis {
val result = AdaptivePrefixFilter(params.threshold, overlap).getCandidates(signatures1, index2)
candidates = result.first
numberOfPairs = result.second
}

/*=================================================================*/

var verifyTimeTotal = 0L
if (!skipVerify) {
val verifier = when (params.verify) {
"Greedy" -> GreedySimilarityVerifier(params.threshold, syn, tax, params.gram)
"SquareImp" -> SquareImpSimilarityVerifier(params.threshold, syn, tax, params.gram)
"SquareImp-Improved" -> SquareImpSimilarityVerifier(params.threshold, syn, tax, params.gram, true)
else -> throw Exception("Invalid verification method: ${params.verify}")
}

verifyTimeTotal = measureTimeMillis {
candidates.mapParallelOrSequential { Pair(it, verifier.getSimilarity(it.first, it.second)) }
.filter { it.second.start >= params.threshold }
.toList()
}
}
/*=================================================================*/

System.gc(); System.runFinalization()

return TestDriveResult(
filterTimeTotal.toDouble() / numberOfPairs,
verifyTimeTotal.toDouble() / candidates.size,
numberOfPairs,
candidates.size
)
}

private data class TestDriveResult(
val filterTime: Double,
val verifyTime: Double,
val numberOfPairs: Int,
val numberOfCandidates: Int
)

// endregion
7 changes: 4 additions & 3 deletions src/main/kotlin/fi/helsinki/cs/udbms/MainJoin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import java.io.File
import kotlin.system.measureTimeMillis

fun main(args: Array<String>) = mainBody {
val params = RuntimeParameters.initialise(args)
val params = JoinParameters.initialise(args)
Dispatcher.initialise(params.singleThread)

/*=================================================================*/

Expand Down Expand Up @@ -97,7 +98,7 @@ fun main(args: Array<String>) = mainBody {
print("Filtering using ${params.filter} on ${if (params.singleThread) "a single thread" else "multiple threads"}... ")
var candidates: List<SegmentedStringPair> = emptyList()
var time = measureTimeMillis {
candidates = AdaptivePrefixFilter(params.threshold, params.overlap).getCandidates(signatures1, index2)
candidates = AdaptivePrefixFilter(params.threshold, params.overlap).getCandidates(signatures1, index2).first
}
println("${candidates.size} candidates obtained in $time ms")

Expand All @@ -124,7 +125,7 @@ fun main(args: Array<String>) = mainBody {

val bw: BufferedWriter? = when (params.output) {
"null" -> null
"stdout" -> System.out.bufferedWriter()
"stdout" -> null
else -> {
print("Writing results to ${params.output}... ")
val w = File(params.output).bufferedWriter()
Expand Down
28 changes: 18 additions & 10 deletions src/main/kotlin/fi/helsinki/cs/udbms/util/Dispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,26 @@
package fi.helsinki.cs.udbms.util

import kotlinx.coroutines.asCoroutineDispatcher
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

object Dispatcher {
@JvmStatic
private val threadPool = Executors.newFixedThreadPool(
if (RuntimeParameters.getInstance()?.singleThread == true) 1
else Runtime.getRuntime().availableProcessors()
)
class Dispatcher {
companion object {
@JvmStatic
private var threadPool: ExecutorService? = null

@JvmStatic
fun getInstance() = threadPool.asCoroutineDispatcher()
@JvmStatic
fun initialise(singleThread: Boolean) {

@JvmStatic
fun shutdown() = threadPool.shutdown()
threadPool = Executors.newFixedThreadPool(
if (singleThread) 1 else Runtime.getRuntime().availableProcessors()
)
}

@JvmStatic
fun getInstance() = threadPool!!.asCoroutineDispatcher()

@JvmStatic
fun shutdown() = threadPool?.shutdown()
}
}
Loading

0 comments on commit 8c8bad3

Please sign in to comment.