diff --git a/.idea/artifacts/AU_Join_jar.xml b/.idea/artifacts/AU_Join_jar.xml index 2866676..c05bd3e 100644 --- a/.idea/artifacts/AU_Join_jar.xml +++ b/.idea/artifacts/AU_Join_jar.xml @@ -8,6 +8,7 @@ + \ No newline at end of file diff --git a/src/main/kotlin/fi/helsinki/cs/udbms/AdaptivePrefixFilter.kt b/src/main/kotlin/fi/helsinki/cs/udbms/AdaptivePrefixFilter.kt index 7fe8509..b17e2b6 100644 --- a/src/main/kotlin/fi/helsinki/cs/udbms/AdaptivePrefixFilter.kt +++ b/src/main/kotlin/fi/helsinki/cs/udbms/AdaptivePrefixFilter.kt @@ -27,14 +27,16 @@ package fi.helsinki.cs.udbms import fi.helsinki.cs.udbms.struct.* import fi.helsinki.cs.udbms.util.forEachParallelOrSequential import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger import kotlin.math.max import kotlin.math.min class AdaptivePrefixFilter(private val threshold: Double, private val overlap: Int) { fun getCandidates(signature1: Map>, index2: InvertedIndex) - : List { + : Pair, Int> { val candidatePairs = ConcurrentHashMap() + val numberOfPairs = AtomicInteger() signature1.entries.forEachParallelOrSequential { (string1, keys1) -> val usedSegments = mutableMapOf>() @@ -45,6 +47,8 @@ class AdaptivePrefixFilter(private val threshold: Double, private val overlap: I val segment1 = key1.segment val list2 = index2.getList(key1) ?: return@nextKey + numberOfPairs.getAndIncrement() + list2.forEach nextString2@{ segment2 -> val string2 = segment2.segmentedString ?: return@nextString2 @@ -76,6 +80,6 @@ class AdaptivePrefixFilter(private val threshold: Double, private val overlap: I } } - return candidatePairs.keys.toList() + return Pair(candidatePairs.keys.toList(), numberOfPairs.toInt()) } } diff --git a/src/main/kotlin/fi/helsinki/cs/udbms/MainEstimation.kt b/src/main/kotlin/fi/helsinki/cs/udbms/MainEstimation.kt new file mode 100644 index 0000000..39ad9ea --- /dev/null +++ b/src/main/kotlin/fi/helsinki/cs/udbms/MainEstimation.kt @@ -0,0 +1,277 @@ +// +// MIT License +// +// Copyright (c) 2019 pengxu +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// + +package fi.helsinki.cs.udbms + +import com.xenomachina.argparser.mainBody +import fi.helsinki.cs.udbms.struct.* +import fi.helsinki.cs.udbms.util.* +import java.util.concurrent.ThreadLocalRandom +import kotlin.math.pow +import kotlin.system.measureTimeMillis + +fun main(args: Array): Unit = mainBody { + val params = EstimationParameters.initialise(args) + Dispatcher.initialise(params.singleThread) + + /*=================================================================*/ + + print("Reading string... ") + val list1 = IO.readSegmentedStrings(params.list1) + val list2 = IO.readSegmentedStrings(params.list2) + println("${list1.size} + ${list2.size} strings loaded") + + var syn: SynonymKnowledge? = null + if (params.synonym.isNotEmpty()) { + print("Reading synonym... ") + syn = IO.readSynonym(params.synonym) + println("${syn.knowledge.size} rules loaded") + } + var tax: TaxonomyKnowledge? = null + if (params.taxonomy.isNotEmpty()) { + print("Reading taxonomy... ") + tax = IO.readTaxonomy(params.taxonomy) + println("${tax.knowledge.size} nodes loaded") + } + + /*=================================================================*/ + + val pebbles1 = list1.mapParallel { Pair(it, PebbleGenerator(syn, tax, params.gram).generate(it)) }.toMap() + val pebbles2 = list2.mapParallel { Pair(it, PebbleGenerator(syn, tax, params.gram).generate(it)) }.toMap() + + val order = GlobalOrder() + order.addAll(pebbles1.values.flatten()) + order.addAll(pebbles2.values.flatten()) + + /*=================================================================*/ + + print("Running test drive... ") + var verified = false + val (filterTime, verifyTime) = params.overlapList.map { + val result = testDrive( + params, list1.shuffled().take(2000), list2.shuffled().take(2000), + pebbles1, pebbles2, order, syn, tax, it, verified + ) + verified = true + return@map Pair(result.filterTime, result.verifyTime) + }.reduceIndexed { i, acc, now -> + Pair( + acc.first + (now.first - acc.first) / (i + 1), + if (now.second == 0.0) acc.second else acc.second + (acc.second - acc.second) / (i + 1) + ) + } + println("filtering time ${filterTime.format(4)} ms/pair, verification time ${verifyTime.format(4)} ms/candidate") + + /*=================================================================*/ + + print("\t") + print("i\t") + params.overlapList.forEach { print("mean$it\t") } + params.overlapList.forEach { print("error$it\t") } + println() + + /*=================================================================*/ + + val p1 = params.sampleSize.toDouble() / list1.size + val p2 = params.sampleSize.toDouble() / list2.size + + var lastEstimations = emptyMap() + + var iteration = 0 + while (++iteration <= params.iteration) { + val sample1 = getBernoulliSample(list1, p1) + val sample2 = getBernoulliSample(list2, p2) + + val estimations = params.overlapList.map { overlap -> + val result = testDrive(params, sample1, sample2, pebbles1, pebbles2, order, syn, tax, overlap, true) + + val scaledCost = + result.numberOfPairs / (p1 * p2) * filterTime + result.numberOfCandidates / (p1 * p2) * verifyTime + val mean = movingMean(scaledCost, iteration, lastEstimations[overlap]) + val variance = movingVariance(scaledCost, iteration, lastEstimations[overlap]) + val error = params.quantile * (variance / iteration).pow(0.5) + + return@map Estimation( + iteration, overlap, result.numberOfPairs, result.numberOfCandidates, + scaledCost, mean, variance, error + ) + }.associateBy { it.overlap } + + print("\t") + print("$iteration\t") + estimations.forEach { print("${it.value.meanOfScaledCost.format(0)}\t") } + estimations.forEach { print("${it.value.errorOfScaledCost.format(0)}\t") } + println() + + lastEstimations = estimations + + // should we stop here? + if (shouldStop(iteration, estimations)) + break + } + + print("Overlap parameters from the best to the worst: ") + print(lastEstimations.values.sortedBy { it.scaledCost }.map { it.overlap }.joinToString()) + println() + + Dispatcher.shutdown() + return@mainBody +} + +private fun shouldStop(iteration: Int, estimations: Map): Boolean { + if (estimations.size == 1) return true + if (iteration < 5) return false + + val estimationsSorted = estimations.values.sortedBy { it.scaledCost } + + return estimationsSorted.first().getMaxScaledCost() < estimationsSorted.drop(1).first().getMinScaledCost() +} + +private fun getBernoulliSample(data: Iterable, p: Double): List { + val rand = ThreadLocalRandom.current() + @Suppress("UNCHECKED_CAST") + return data.mapParallel { if (rand.nextDouble(1.0) < p) it else Unit }.filterNot { it == Unit } as List +} + +// region Estimation + +private fun movingMean(estimate: Double, iteration: Int, last: Estimation?): Double { + val lastSafe = last ?: Estimation() + + if (iteration == 1) + return estimate + + return lastSafe.meanOfScaledCost + (estimate - lastSafe.meanOfScaledCost) / iteration +} + +private fun movingVariance(estimate: Double, iteration: Int, last: Estimation?): Double { + val lastSafe = last ?: Estimation() + + if (iteration == 1) + return 0.0 + + return (iteration - 2).toDouble() / (iteration - 1).toDouble() * lastSafe.varianceOfScaledCost + + ((estimate - lastSafe.meanOfScaledCost).pow(2) / iteration) +} + +private data class Estimation( + val iteration: Int = 0, + val overlap: Int = 0, + val numberOfPairs: Int = 0, + val numberOfCandidates: Int = 0, + val scaledCost: Double = 0.0, + val meanOfScaledCost: Double = 0.0, + val varianceOfScaledCost: Double = 0.0, + val errorOfScaledCost: Double = 0.0 +) { + fun getMinScaledCost() = scaledCost - errorOfScaledCost + fun getMaxScaledCost() = scaledCost + errorOfScaledCost +} + +// endregion + +// region TestDrive + +private fun testDrive( + params: EstimationParameters, + list1: List, + list2: List, + pebbles1: Map>, + pebbles2: Map>, + order: GlobalOrder, + syn: SynonymKnowledge?, + tax: TaxonomyKnowledge?, + overlap: Int, + skipVerify: Boolean +): TestDriveResult { + System.gc(); System.runFinalization() + + /*=================================================================*/ + + var signatures1: Map> = emptyMap() + var signatures2: Map> = emptyMap() + + run { + val reducer = when (params.filter) { + "Fast" -> FastPebbleReducer(params.threshold, overlap, order) + "DP" -> DynamicProgrammingPebbleReducer(params.threshold, overlap, order) + else -> throw Exception("Invalid filtering method: ${params.filter}") + } + + signatures1 = list1.mapParallel { Pair(it, reducer.reduce(it, pebbles1[it] ?: emptyList())) }.toMap() + signatures2 = list2.mapParallel { Pair(it, reducer.reduce(it, pebbles2[it] ?: emptyList())) }.toMap() + }.run { System.gc(); System.runFinalization(); } + + /*=================================================================*/ + + val index2 = InvertedIndex() + signatures2.map { str -> str.value.map { p -> index2.add(p, p.segment) } } + + /*=================================================================*/ + + var candidates: List = emptyList() + var numberOfPairs = 0 + val filterTimeTotal = measureTimeMillis { + val result = AdaptivePrefixFilter(params.threshold, overlap).getCandidates(signatures1, index2) + candidates = result.first + numberOfPairs = result.second + } + + /*=================================================================*/ + + var verifyTimeTotal = 0L + if (!skipVerify) { + val verifier = when (params.verify) { + "Greedy" -> GreedySimilarityVerifier(params.threshold, syn, tax, params.gram) + "SquareImp" -> SquareImpSimilarityVerifier(params.threshold, syn, tax, params.gram) + "SquareImp-Improved" -> SquareImpSimilarityVerifier(params.threshold, syn, tax, params.gram, true) + else -> throw Exception("Invalid verification method: ${params.verify}") + } + + verifyTimeTotal = measureTimeMillis { + candidates.mapParallelOrSequential { Pair(it, verifier.getSimilarity(it.first, it.second)) } + .filter { it.second.start >= params.threshold } + .toList() + } + } + /*=================================================================*/ + + System.gc(); System.runFinalization() + + return TestDriveResult( + filterTimeTotal.toDouble() / numberOfPairs, + verifyTimeTotal.toDouble() / candidates.size, + numberOfPairs, + candidates.size + ) +} + +private data class TestDriveResult( + val filterTime: Double, + val verifyTime: Double, + val numberOfPairs: Int, + val numberOfCandidates: Int +) + +// endregion diff --git a/src/main/kotlin/fi/helsinki/cs/udbms/MainJoin.kt b/src/main/kotlin/fi/helsinki/cs/udbms/MainJoin.kt index e16f555..dcb8136 100644 --- a/src/main/kotlin/fi/helsinki/cs/udbms/MainJoin.kt +++ b/src/main/kotlin/fi/helsinki/cs/udbms/MainJoin.kt @@ -32,7 +32,8 @@ import java.io.File import kotlin.system.measureTimeMillis fun main(args: Array) = mainBody { - val params = RuntimeParameters.initialise(args) + val params = JoinParameters.initialise(args) + Dispatcher.initialise(params.singleThread) /*=================================================================*/ @@ -97,7 +98,7 @@ fun main(args: Array) = mainBody { print("Filtering using ${params.filter} on ${if (params.singleThread) "a single thread" else "multiple threads"}... ") var candidates: List = emptyList() var time = measureTimeMillis { - candidates = AdaptivePrefixFilter(params.threshold, params.overlap).getCandidates(signatures1, index2) + candidates = AdaptivePrefixFilter(params.threshold, params.overlap).getCandidates(signatures1, index2).first } println("${candidates.size} candidates obtained in $time ms") @@ -124,7 +125,7 @@ fun main(args: Array) = mainBody { val bw: BufferedWriter? = when (params.output) { "null" -> null - "stdout" -> System.out.bufferedWriter() + "stdout" -> null else -> { print("Writing results to ${params.output}... ") val w = File(params.output).bufferedWriter() diff --git a/src/main/kotlin/fi/helsinki/cs/udbms/util/Dispatcher.kt b/src/main/kotlin/fi/helsinki/cs/udbms/util/Dispatcher.kt index e84b040..adbab7d 100644 --- a/src/main/kotlin/fi/helsinki/cs/udbms/util/Dispatcher.kt +++ b/src/main/kotlin/fi/helsinki/cs/udbms/util/Dispatcher.kt @@ -25,18 +25,26 @@ package fi.helsinki.cs.udbms.util import kotlinx.coroutines.asCoroutineDispatcher +import java.util.concurrent.ExecutorService import java.util.concurrent.Executors -object Dispatcher { - @JvmStatic - private val threadPool = Executors.newFixedThreadPool( - if (RuntimeParameters.getInstance()?.singleThread == true) 1 - else Runtime.getRuntime().availableProcessors() - ) +class Dispatcher { + companion object { + @JvmStatic + private var threadPool: ExecutorService? = null - @JvmStatic - fun getInstance() = threadPool.asCoroutineDispatcher() + @JvmStatic + fun initialise(singleThread: Boolean) { - @JvmStatic - fun shutdown() = threadPool.shutdown() + threadPool = Executors.newFixedThreadPool( + if (singleThread) 1 else Runtime.getRuntime().availableProcessors() + ) + } + + @JvmStatic + fun getInstance() = threadPool!!.asCoroutineDispatcher() + + @JvmStatic + fun shutdown() = threadPool?.shutdown() + } } diff --git a/src/main/kotlin/fi/helsinki/cs/udbms/util/EstimationParameters.kt b/src/main/kotlin/fi/helsinki/cs/udbms/util/EstimationParameters.kt new file mode 100644 index 0000000..020d158 --- /dev/null +++ b/src/main/kotlin/fi/helsinki/cs/udbms/util/EstimationParameters.kt @@ -0,0 +1,140 @@ +// +// MIT License +// +// Copyright (c) 2019 pengxu +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// + +package fi.helsinki.cs.udbms.util + +import com.xenomachina.argparser.ArgParser +import com.xenomachina.argparser.DefaultHelpFormatter +import com.xenomachina.argparser.InvalidArgumentException +import com.xenomachina.argparser.default +import kotlin.system.exitProcess + +class EstimationParameters(parser: ArgParser) { + companion object { + @JvmStatic + private var parser: EstimationParameters? = null + + @JvmStatic + fun initialise(args: Array): EstimationParameters { + parser = ArgParser( + args, + helpFormatter = DefaultHelpFormatter( + epilogue = """ + Example: ./AU-Est --taxonomy tax.txt --synonym syn.txt --jaccard 3 -oresult.csv 0.9 list1.txt list2.txt 1 2 3 4 5 + """.trimIndent() + ) + ).parseInto(::EstimationParameters) + + if (parser!!.synonym.isEmpty() && parser!!.taxonomy.isEmpty() && parser!!.gram == 0) { + println("You must specify at least one of --jaccard, --taxonomy, or --synonym") + exitProcess(1) + } + return parser!! + } + + @JvmStatic + fun getInstance() = parser + } + + val gram by parser.storing( + "--jaccard", + help = "enable Jaccard similarity and set gram length (> 1)" + ) { toInt() }.default(0) + + val taxonomy by parser.storing( + "--taxonomy", + help = "enable taxonomy similarity and specify the filename of taxonomy knowledge" + ) { toString() }.default { "" } + + val synonym by parser.storing( + "--synonym", + help = "enable synonym similarity and specify the filename of synonym knowledge" + ) { toString() }.default { "" } + + val filter by parser.mapping( + "--filter-fast" to "Fast", + "--filter-dp" to "DP", + help = "Specify the filtering method: Fast (Heuristic) and DP (Dynamic Programming) (default: --filter-fast)" + ).default { "Fast" } + + val verify by parser.mapping( + "--verify-greedy" to "Greedy", + "--verify-squareimp" to "SquareImp", + "--verify-squareimp-improved" to "SquareImp-Improved", + help = "Specify the verification method: Greedy, SquareImp, or our improved SquareImp (default: --verify-greedy)" + ).default { "Greedy" } + + val singleThread by parser.flagging( + "--single", + help = "perform filtering and verification on a single thread (default: on multiple threads)" + ) + + val sampleSize by parser.storing( + "-s", "--sample-size", + help = "specify the expected sample size for estimation (> 0, default: 100)" + ) { toInt() }.default { 100 }.addValidator { + if (value <= 0) throw InvalidArgumentException("the expected sample size must be at least 1") + } + + val quantile by parser.storing( + "-q", "--quantile", + help = "specify the quantile for Student t-distribution (default: 0.842 for 60% confidence levels on both sides)" + ) { toDouble() }.default { 0.842 } + + val iteration by parser.storing( + "-i", "--iteration", + help = "limit the number of iterations (> 0, default: 20)" + ) { toInt() }.default { 20 }.addValidator { + if (value <= 0) throw InvalidArgumentException("the expected sample size must be at least 1") + } + + val threshold by parser.positional( + "THRESHOLD", + help = "similarity threshold (0, 1]" + ) { toDouble() }.default(0.0).addValidator { + if (value <= 0 || value > 1) throw InvalidArgumentException("The similarity threshold must be within (0, 1].") + } + + val list1 by parser.positional( + "LIST_1", + help = "filename of the first segmented string list" + ).default("").addValidator { + if (value.isEmpty()) throw InvalidArgumentException("You must specify two datasets") + } + + val list2 by parser.positional( + "LIST_2", + help = "filename of the second segmented string list" + ).default("").addValidator { + if (value.isEmpty()) throw InvalidArgumentException("You must specify two datasets") + } + + val overlapList by parser.positionalList( + "OVERLAPS", + "Values of overlap to be tested", + 1..Int.MAX_VALUE + ) { toInt() }.default { emptyList() }.addValidator { + if (value.isEmpty()) throw InvalidArgumentException("You muse specify at least one overlap value") + } +} \ No newline at end of file diff --git a/src/main/kotlin/fi/helsinki/cs/udbms/util/RuntimeParameters.kt b/src/main/kotlin/fi/helsinki/cs/udbms/util/JoinParameters.kt similarity index 95% rename from src/main/kotlin/fi/helsinki/cs/udbms/util/RuntimeParameters.kt rename to src/main/kotlin/fi/helsinki/cs/udbms/util/JoinParameters.kt index 641df19..5101214 100644 --- a/src/main/kotlin/fi/helsinki/cs/udbms/util/RuntimeParameters.kt +++ b/src/main/kotlin/fi/helsinki/cs/udbms/util/JoinParameters.kt @@ -30,13 +30,13 @@ import com.xenomachina.argparser.InvalidArgumentException import com.xenomachina.argparser.default import kotlin.system.exitProcess -class RuntimeParameters(parser: ArgParser) { +class JoinParameters(parser: ArgParser) { companion object { @JvmStatic - private var parser: RuntimeParameters? = null + private var parser: JoinParameters? = null @JvmStatic - fun initialise(args: Array): RuntimeParameters { + fun initialise(args: Array): JoinParameters { parser = ArgParser( args, helpFormatter = DefaultHelpFormatter( @@ -44,7 +44,7 @@ class RuntimeParameters(parser: ArgParser) { Example: ./AU-Join --taxonomy tax.txt --synonym syn.txt --jaccard 3 -c3 -oresult.csv 0.9 list1.txt list2.txt """.trimIndent() ) - ).parseInto(::RuntimeParameters) + ).parseInto(::JoinParameters) if (parser!!.synonym.isEmpty() && parser!!.taxonomy.isEmpty() && parser!!.gram == 0) { println("You must specify at least one of --jaccard, --taxonomy, or --synonym")