Skip to content

Commit

Permalink
Merge pull request #5 from juitnow/noref-interval
Browse files Browse the repository at this point in the history
Un-reference intervals and timeouts
  • Loading branch information
pfumagalli authored Jan 24, 2024
2 parents 85425b7 + 23b3939 commit 41a877b
Show file tree
Hide file tree
Showing 15 changed files with 1,261 additions and 1,203 deletions.
2,346 changes: 1,177 additions & 1,169 deletions package-lock.json

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "memx",
"version": "0.3.1",
"version": "0.3.2",
"description": "Simple and fast memcached client",
"main": "./dist/index.cjs",
"module": "./dist/index.mjs",
Expand Down Expand Up @@ -28,14 +28,14 @@
"author": "Juit Developers <[email protected]>",
"license": "Apache-2.0",
"devDependencies": {
"@plugjs/build": "^0.4.7",
"@types/chai": "^4.3.5",
"@types/chai-as-promised": "^7.1.5",
"@types/memjs": "^1.3.0",
"chai": "^4.3.7",
"@plugjs/build": "^0.5.22",
"@types/chai": "^4.3.11",
"@types/chai-as-promised": "^7.1.8",
"@types/memjs": "^1.3.3",
"chai": "<5.0.0",
"chai-as-promised": "^7.1.1",
"chai-exclude": "^2.1.0",
"memjs": "^1.3.1"
"memjs": "^1.3.2"
},
"directories": {
"test": "test"
Expand Down
2 changes: 1 addition & 1 deletion src/cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import assert from 'node:assert'

import { ServerAdapter } from './server'

import type { Adapter, Counter, AdapterResult, Stats } from './types'
import type { ServerOptions } from './server'
import type { Adapter, AdapterResult, Counter, Stats } from './types'

function parseHosts(hosts?: string): ServerOptions[] {
const result: { host: string, port?: number }[] = []
Expand Down
8 changes: 4 additions & 4 deletions src/connection.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import assert from 'node:assert'
import net from 'node:net'

import { Encoder } from './encode'
import { Decoder } from './decode'
import { BUFFERS, OPCODE } from './constants'
import { Decoder } from './decode'
import { Encoder } from './encode'
import { socketFinalizationRegistry } from './internals'

import type { Socket } from 'node:net'
import type { RawIncomingPacket } from './decode'
import type { RawOutgoingPacket } from './encode'
import type { Socket } from 'node:net'

export type RawIncomingPackets = [ RawIncomingPacket, ...RawIncomingPacket[] ]

Expand Down Expand Up @@ -173,7 +173,7 @@ export class Connection {
if (error) return deferred.reject(error)
})

const timeout = setTimeout(() => deferred.reject(new Error('No response')), this.#timeout)
const timeout = setTimeout(() => deferred.reject(new Error('No response')), this.#timeout).unref()

return deferred.promise.finally(() => {
clearTimeout(timeout)
Expand Down
2 changes: 1 addition & 1 deletion src/decode.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import assert from 'node:assert'

import { DATA_TYPE, EMPTY_BUFFER, MAGIC, OFFSETS } from './constants'
import { allocateBuffer } from './buffers'
import { DATA_TYPE, EMPTY_BUFFER, MAGIC, OFFSETS } from './constants'

import type { RecyclableBuffer } from './buffers'

Expand Down
2 changes: 1 addition & 1 deletion src/encode.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { allocateBuffer } from './buffers'
import { DATA_TYPE, EMPTY_BUFFER, MAGIC, VBUCKET, OFFSETS } from './constants'
import { DATA_TYPE, EMPTY_BUFFER, MAGIC, OFFSETS, VBUCKET } from './constants'

import type { RecyclableBuffer } from './buffers'
import type { OPCODE } from './constants'
Expand Down
2 changes: 1 addition & 1 deletion src/fake.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ export class FakeAdapter implements Adapter {
if (! ttl) return this.#cache.clear()

const wait = toExp(ttl) - Date.now()
setTimeout(() => this.#cache.clear(), wait)
setTimeout(() => this.#cache.clear(), wait).unref()
}

async noop(): Promise<void> {
Expand Down
17 changes: 6 additions & 11 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
export type { RecyclableBuffer } from './buffers'

import * as connection from './connection'
import * as constants from './constants'
import * as decode from './decode'
import * as encode from './encode'
import * as constants from './constants'
import * as connection from './connection'

export {
decode,
encode,
constants,
connection,
}
export { connection, constants, decode, encode }

export * from './fake'
export * from './types'
export * from './client'
export * from './cluster'
export * from './fake'
export * from './server'
export * from './client'
export * from './types'
export * from './utils'
2 changes: 1 addition & 1 deletion src/server.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { Connection } from './connection'
import { BUFFERS, OPCODE, STATUS } from './constants'

import type { Adapter, Counter, AdapterResult, Stats } from './types'
import type { ConnectionOptions } from './connection'
import type { RawIncomingPacket } from './decode'
import type { Adapter, AdapterResult, Counter, Stats } from './types'

const statsBigInt: readonly string[] = [
'auth_cmds', 'auth_errors', 'bytes', 'bytes_read', 'bytes_written', 'cas_badval', 'cas_hits', 'cas_misses',
Expand Down
4 changes: 2 additions & 2 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ export class PoorManLock {
do {
cas = await this.#client.add(this.#name, owner, { ttl: 2 })
if (cas !== undefined) break
await new Promise((resolve) => setTimeout(resolve, 100))
await new Promise((resolve) => void setTimeout(resolve, 100).unref())
} while (Date.now() < end)

if (cas === undefined) {
Expand All @@ -154,7 +154,7 @@ export class PoorManLock {
assert(replaced !== undefined, `Lock "${this.#client.prefix}${this.#name}" not replaced`)
cas = replaced
})(), `Error extending lock "${this.#client.prefix}${this.#name}"`)
}, 100)
}, 100).unref()

try {
return await executor()
Expand Down
2 changes: 1 addition & 1 deletion test/01-decode.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { randomFillSync } from 'node:crypto'

import { expect } from 'chai'

import { decode, constants } from '../src/index'
import { constants, decode } from '../src/index'


describe('Decoding Packets', () => {
Expand Down
2 changes: 1 addition & 1 deletion test/02-encode.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai'

import { encode, constants } from '../src/index'
import { constants, encode } from '../src/index'

describe('Encoding Packets', () => {
it('should encode a packet with all information required', () => {
Expand Down
2 changes: 1 addition & 1 deletion test/06-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { randomBytes } from 'node:crypto'

import { expect } from 'chai'

import { MemxClient, ClusterAdapter, ServerAdapter } from '../src/index'
import { ClusterAdapter, MemxClient, ServerAdapter } from '../src/index'

import type { Adapter } from '../src/index'

Expand Down
32 changes: 30 additions & 2 deletions test/10-utils.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { randomBytes } from 'node:crypto'

import { exec } from '@plugjs/build'
import { expect } from 'chai'


import { Bundle, MemxClient, Factory, PoorManLock } from '../src/index'
import { Bundle, Factory, MemxClient, PoorManLock } from '../src/index'

describe('Utilities', () => {
const host = process.env.MEMCACHED_HOST || '127.0.0.1'
Expand Down Expand Up @@ -273,5 +273,33 @@ describe('Utilities', () => {
expect((await client.getc(key))?.value).to.be.undefined
expect(record).to.eql([ 'create 1', 'create 2', 'start 1', 'end 1' ])
}, 3000)

it('should acquire when another process holding the lock crashes', async () => {
const lockname = `distributed-${process.pid}-${Math.floor(Math.random() * 100000)}`
const child = exec('tsrun', './test/locker.ts', lockname)

// this should give the child process enough time to start and lock
await new Promise((resolve) => void setTimeout(resolve, 500))

// the first attempt to locking should fail, the child should be locking!
await expect(new PoorManLock(client, lockname).execute(() => {
log.error('Initial lock attempt succesful')
}, { timeout: 100, owner: `test-parent-${process.pid}` }))
.to.be.rejectedWith(/timeout/)

// the second attempt should succeed, once the child dies...
const p1 = new PoorManLock(client, lockname).execute(() => {
log('Parent process executing 1')
}, { owner: `test-parent-${process.pid}@1` })
const p2 = new PoorManLock(client, lockname).execute(() => {
log('Parent process executing 2')
}, { owner: `test-parent-${process.pid}@2` })
const p3 = new PoorManLock(client, lockname).execute(() => {
log('Parent process executing 3')
}, { owner: `test-parent-${process.pid}@3` })

// reap up the child's leftovers...
await Promise.all([ p1, p2, p3, child ])
}, 10000)
})
})
27 changes: 27 additions & 0 deletions test/locker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/* eslint-disable no-console */
import { log } from '@plugjs/build'

import { MemxClient, PoorManLock } from '../src/index'

const host = process.env.MEMCACHED_HOST || '127.0.0.1'
const port = parseInt(process.env.MEMCACHED_PORT || '11211')
const client = new MemxClient({ hosts: [ { host, port } ] })

async function test(): Promise<void> {
console.log(`Child process using lock "${process.argv[2]}"`)

const lock = new PoorManLock(client, process.argv[2])
try {
await lock.execute(async () => {
console.log('Child process locking')
await new Promise((resolve) => void setTimeout(resolve, 2000))
console.log('Child process exiting')
process.exit(123)
}, { owner: `test-child-${process.pid}` })
} finally {
console.log('Child process exit interrupted ???')
process.exit(123)
}
}

test().catch((error) => log.error('Error in child process test', error))

0 comments on commit 41a877b

Please sign in to comment.