Skip to content

Commit

Permalink
Small refactorings
Browse files Browse the repository at this point in the history
  • Loading branch information
Lanayx committed Oct 16, 2023
1 parent bd54886 commit 3a09df6
Showing 1 changed file with 9 additions and 9 deletions.
18 changes: 9 additions & 9 deletions src/Pulsar.Client/Api/PulsarClient.fs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type PulsarClient internal (config: PulsarClientConfiguration) as this =
| Active -> ()
| _ -> raise <| AlreadyClosedException("Client already closed. State: " + this.ClientState.ToString())

let getActiveScmema (schema: ISchema<'T>) (topic:TopicName) =
let getActiveSchema (schema: ISchema<'T>) (topic:TopicName) =
backgroundTask {
let mutable activeSchema = schema
if schema.GetType() = autoConsumeStubType then
Expand Down Expand Up @@ -173,7 +173,7 @@ type PulsarClient internal (config: PulsarClientConfiguration) as this =
backgroundTask {
let! schemaProvider = this.PreProcessSchemaBeforeSubscribe(schema, topic.CompleteTopicName)
let! metadata = lookupService.GetPartitionedTopicMetadata topic.CompleteTopicName
let! activeSchema = getActiveScmema schema topic
let! activeSchema = getActiveSchema schema topic
return {
TopicName = topic
Schema = activeSchema
Expand Down Expand Up @@ -221,11 +221,11 @@ type PulsarClient internal (config: PulsarClientConfiguration) as this =
backgroundTask {
checkIfActive()
Log.Logger.LogDebug("MultiTopicSubscribeAsync started")
let! partitionsForTopis =
let! partitionsForTopics =
consumerConfig.Topics
|> Seq.map (fun topic -> this.GetConsumerInitInfo(schema, topic))
|> Task.WhenAll
let! consumer = MultiTopicsConsumerImpl.InitMultiTopic(consumerConfig, config, connectionPool, partitionsForTopis,
let! consumer = MultiTopicsConsumerImpl.InitMultiTopic(consumerConfig, config, connectionPool, partitionsForTopics,
lookupService, interceptors, removeConsumer)
addConsumer consumer
return consumer :> IConsumer<'T>
Expand All @@ -238,7 +238,7 @@ type PulsarClient internal (config: PulsarClientConfiguration) as this =
let topic = consumerConfig.SingleTopic
let! schemaProvider = this.PreProcessSchemaBeforeSubscribe(schema, topic.CompleteTopicName)
let! metadata = lookupService.GetPartitionedTopicMetadata topic.CompleteTopicName
let! activeSchema = getActiveScmema schema topic
let! activeSchema = getActiveSchema schema topic
if metadata.IsMultiPartitioned then
let consumerInitInfo = {
TopicName = topic
Expand Down Expand Up @@ -291,7 +291,7 @@ type PulsarClient internal (config: PulsarClientConfiguration) as this =
Log.Logger.LogDebug("CreateReaderAsync started")
let! metadata = lookupService.GetPartitionedTopicMetadata readerConfig.Topic.CompleteTopicName
let! schemaProvider = this.PreProcessSchemaBeforeSubscribe(schema, readerConfig.Topic.CompleteTopicName)
let! activeSchema = getActiveScmema schema readerConfig.Topic
let! activeSchema = getActiveSchema schema readerConfig.Topic
let! reader =
if metadata.IsMultiPartitioned then
if MultiTopicsConsumerImpl<_>.isIllegalMultiTopicsMessageId readerConfig.StartMessageId.Value then
Expand All @@ -316,12 +316,12 @@ type PulsarClient internal (config: PulsarClientConfiguration) as this =
Log.Logger.LogDebug("CreateTableViewReaderAsync started")
let! metadata = lookupService.GetPartitionedTopicMetadata tableViewConfig.Topic.CompleteTopicName
let! schemaProvider = this.PreProcessSchemaBeforeSubscribe(schema, tableViewConfig.Topic.CompleteTopicName)
let! activeSchema = getActiveScmema schema tableViewConfig.Topic
let! activeSchema = getActiveSchema schema tableViewConfig.Topic
let readerConfig = {
ReaderConfiguration.Default with
Topic = tableViewConfig.Topic
StartMessageId = Some MessageId.Earliest
ReadCompacted = true
ReadCompacted = tableViewConfig.Topic.IsPersistent
AutoUpdatePartitions = tableViewConfig.AutoUpdatePartitions
AutoUpdatePartitionsInterval = tableViewConfig.AutoUpdatePartitionsInterval
}
Expand All @@ -345,7 +345,7 @@ type PulsarClient internal (config: PulsarClientConfiguration) as this =
backgroundTask {
checkIfActive()
Log.Logger.LogDebug("CreateTableViewAsync started")
let! tableView = TableViewImpl.Init((fun () -> this.CreateTableViewReaderAsync(tableViewConfig,schema)))
let! tableView = TableViewImpl.Init (fun () -> this.CreateTableViewReaderAsync(tableViewConfig,schema))
return tableView :> ITableView<'T>
}

Expand Down

0 comments on commit 3a09df6

Please sign in to comment.