From c7612df34b14373e46b6b07bcf3f7241dd896d6e Mon Sep 17 00:00:00 2001 From: "guorui.fan" Date: Mon, 19 Dec 2022 17:14:53 +0800 Subject: [PATCH] fix: state backend don't merge --- streaming-connector/go.mod | 5 ++- streaming-connector/go.sum | 33 ++++++--------- streaming-core/store/backend.go | 62 ++++++++++++++-------------- streaming-core/store/backend_test.go | 4 +- streaming-core/stream/environment.go | 2 +- streaming-operator/go.mod | 2 +- 6 files changed, 52 insertions(+), 56 deletions(-) diff --git a/streaming-connector/go.mod b/streaming-connector/go.mod index 7a3ba67..61ad312 100644 --- a/streaming-connector/go.mod +++ b/streaming-connector/go.mod @@ -3,7 +3,7 @@ module github.com/RuiFG/streaming/streaming-connector go 1.19 require ( - github.com/RuiFG/streaming/streaming-core v0.1.1 + github.com/RuiFG/streaming/streaming-core v0.1.2 github.com/Shopify/sarama v1.35.0 ) @@ -27,11 +27,14 @@ require ( github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/twmb/murmur3 v1.1.5 // indirect + github.com/uber-go/tally/v4 v4.1.4 // indirect github.com/xujiajun/mmap-go v1.0.1 // indirect github.com/xujiajun/nutsdb v0.11.1 // indirect github.com/xujiajun/utils v0.0.0-20220904132955-5f7c5b914235 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect + go.uber.org/zap v1.24.0 // indirect golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect golang.org/x/net v0.0.0-20220708220712-1185a9018129 // indirect golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect diff --git a/streaming-connector/go.sum b/streaming-connector/go.sum index c3c935c..e20ead1 100644 --- a/streaming-connector/go.sum +++ b/streaming-connector/go.sum @@ -33,8 +33,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/RuiFG/streaming/streaming-core v0.0.1 h1:Nc5WN1iJWrqOUcwws9bf29s1xlRVa262Lk5QcjYSyoo= -github.com/RuiFG/streaming/streaming-core v0.0.1/go.mod h1:5EHoUTdiK4Ojay+8f2VGEXcXtaH/EYydvA3lpgzZiZs= +github.com/RuiFG/streaming/streaming-core v0.1.1 h1:g9sM7KbiwJO0dVftX7DokylWfGmS5mPG6TJHuquy7ZY= +github.com/RuiFG/streaming/streaming-core v0.1.1/go.mod h1:QpIhBUEBuCybjTfTKyAdEbl7a5aZRA2Mm422ztFcuS8= github.com/Shopify/sarama v1.35.0 h1:opEGHcK8s5OpQF99wW0D4ol7A3qUpfSFigrDXnWmOcs= github.com/Shopify/sarama v1.35.0/go.mod h1:n8obse6Cz5NjjXjKwR1JeYr7CkQn4KG+HENJ8n/T9oQ= github.com/Shopify/toxiproxy/v2 v2.4.0 h1:O1e4Jfvr/hefNTNu+8VtdEG5lSeamJRo4aKhMOKNM64= @@ -45,12 +45,12 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= -github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0= github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE= +github.com/cactus/go-statsd-client/statsd v0.0.0-20200423205355-cb0885a1018c/go.mod h1:l/bIBLeOl9eX+wxJAzxS4TveKRtAqlyDpHjhkfO0MEI= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -238,10 +238,13 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/twmb/murmur3 v1.1.5 h1:i9OLS9fkuLzBXjt6dptlAEyk58fJsSTXbRg3SgVyqgk= +github.com/twmb/murmur3 v1.1.5/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= +github.com/uber-go/tally/v4 v4.1.4 h1:LzQyYvWQIp1gYNWU2tDNzVl04H2VchEUvMgabx/7MTI= +github.com/uber-go/tally/v4 v4.1.4/go.mod h1:aXeSTDMl4tNosyf6rdU8jlgScHyjEGGtfJ/uwCIf/vM= github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= @@ -249,14 +252,13 @@ github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgk github.com/xujiajun/gorouter v1.2.0/go.mod h1:yJrIta+bTNpBM/2UT8hLOaEAFckO+m/qmR3luMIQygM= github.com/xujiajun/mmap-go v1.0.1 h1:7Se7ss1fLPPRW+ePgqGpCkfGIZzJV6JPq9Wq9iv/WHc= github.com/xujiajun/mmap-go v1.0.1/go.mod h1:CNN6Sw4SL69Sui00p0zEzcZKbt+5HtEnYUsc6BKKRMg= -github.com/xujiajun/nutsdb v0.11.0 h1:uzQb3Tvib0R/5kwdt6AcEJJpHtmcunGGMsq63dhnyyM= -github.com/xujiajun/nutsdb v0.11.0/go.mod h1:sAT5Kr8+53X2r1eFMHw2VSPLSAo/PiJCZPK5QtMsw7g= +github.com/xujiajun/nutsdb v0.11.1 h1:zLyIvp3ABHMohtcqi0sbt7gGOFWfse+ZbLv2GVb6ZYw= +github.com/xujiajun/nutsdb v0.11.1/go.mod h1:sAT5Kr8+53X2r1eFMHw2VSPLSAo/PiJCZPK5QtMsw7g= github.com/xujiajun/utils v0.0.0-20220904132955-5f7c5b914235 h1:w0si+uee0iAaCJO9q86T6yrhdadgcsoNuh47LrUykzg= github.com/xujiajun/utils v0.0.0-20220904132955-5f7c5b914235/go.mod h1:MR4+0R6A9NS5IABnIM3384FfOq8QFVnm7WDrBOhIaMU= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -265,11 +267,10 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= -go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= -go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8= -go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= +go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= +go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -309,7 +310,6 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -338,7 +338,6 @@ golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= -golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220708220712-1185a9018129 h1:vucSRfWwTsoXro7P+3Cjlr6flUMtzCwzlvkxEQtHHB0= @@ -357,9 +356,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -395,9 +393,7 @@ golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -456,7 +452,6 @@ golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= -golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -546,17 +541,15 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/validator.v2 v2.0.0-20200605151824-2b28d334fa05/go.mod h1:o4V0GXN9/CAmCsvJ0oXYZvrZOe7syiDZSN1GWGZTGzc= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/streaming-core/store/backend.go b/streaming-core/store/backend.go index 73f7726..063815a 100644 --- a/streaming-core/store/backend.go +++ b/streaming-core/store/backend.go @@ -4,6 +4,8 @@ import ( "fmt" "github.com/xujiajun/nutsdb" "go.uber.org/zap" + "os" + "path" "sort" "strconv" "sync" @@ -18,32 +20,14 @@ func parseCheckpointId(idStr string) int64 { return id } -// memory only for test -type memory struct { -} - -func (m *memory) Save(id int64, name string, state []byte) error { return nil } - -func (m *memory) Persist(checkpointId int64) error { return nil } - -func (m *memory) Get(name string) ([]byte, error) { return nil, nil } - -func (m *memory) Close() error { return nil } - -func NewMemoryBackend() Backend { - return &memory{} -} - type fs struct { logger *zap.Logger db *nutsdb.DB //storage stores all checkpoint state storage *sync.Map //checkpoints are currently completed checkpoint id sorted slice - checkpoints []int64 - //checkpointsTotalNum - checkpointsTotalNum int - checkpointsNumMerged int + checkpoints []int64 + checkpointDir string checkpointsNumRetained int } @@ -76,6 +60,26 @@ func (r *fs) init() error { } +func (r *fs) NeedMerge() bool { + files, _ := os.ReadDir(r.checkpointDir) + if len(files) == 0 { + return false + } + datNum := 0 + for _, f := range files { + id := f.Name() + fileSuffix := path.Ext(path.Base(id)) + if fileSuffix != nutsdb.DataSuffix { + continue + } + datNum += 1 + } + if datNum >= 2 { + return true + } + return false +} + // Save state according to checkpoint and operator name // if the checkpoint does not exist, will create func (r *fs) Save(checkpointId int64, name string, state []byte) error { @@ -111,16 +115,11 @@ func (r *fs) Persist(checkpointId int64) error { }); err != nil { return fmt.Errorf("failed to persist %d checkpoint state: %w", checkpointId, err) } - r.checkpointsTotalNum += 1 //2.clean up expired checkpoint status in db //3.clean up checkpoint status in memory - if r.checkpointsTotalNum%r.checkpointsNumRetained == 0 { + if len(r.checkpoints) > r.checkpointsNumRetained { if err := r.db.Update(func(tx *nutsdb.Tx) error { - var deletedCheckpointIds []int64 - if len(r.checkpoints) > r.checkpointsNumRetained { - deletedCheckpointIds = r.checkpoints[:len(r.checkpoints)-r.checkpointsNumRetained] - r.checkpoints = r.checkpoints[len(r.checkpoints)-r.checkpointsNumRetained:] - } + deletedCheckpointIds := r.checkpoints[:len(r.checkpoints)-r.checkpointsNumRetained] for _, deletedCheckpointId := range deletedCheckpointIds { if err := tx.DeleteBucket(nutsdb.DataStructureBPTree, formatCheckpointId(deletedCheckpointId)); err != nil { return err @@ -129,13 +128,14 @@ func (r *fs) Persist(checkpointId int64) error { for _, deletedCheckpointId := range deletedCheckpointIds { r.storage.Delete(deletedCheckpointId) } + r.checkpoints = r.checkpoints[len(r.checkpoints)-r.checkpointsNumRetained:] return nil }); err != nil { r.logger.Warn("failed to clear up expired checkpoint data.", zap.Error(err)) } } - if r.checkpointsTotalNum%r.checkpointsNumMerged == 0 { - //4.merge fs state + //4.merge fs state + if r.NeedMerge() { if err := r.db.Merge(); err != nil { r.logger.Warn("failed to merge fs state.", zap.Error(err)) } @@ -171,7 +171,7 @@ func (r *fs) Close() error { return r.db.Close() } -func NewFSBackend(logger *zap.Logger, checkpointsDir string, checkpointsNumRetained int, checkpointsNumMerged int) (Backend, error) { +func NewFSBackend(logger *zap.Logger, checkpointsDir string, checkpointsNumRetained int) (Backend, error) { opts := nutsdb.DefaultOptions opts.SegmentSize = 1 * nutsdb.GB opts.Dir = checkpointsDir @@ -184,8 +184,8 @@ func NewFSBackend(logger *zap.Logger, checkpointsDir string, checkpointsNumRetai db: db, storage: &sync.Map{}, checkpoints: []int64{}, + checkpointDir: checkpointsDir, checkpointsNumRetained: checkpointsNumRetained, - checkpointsNumMerged: checkpointsNumMerged, } return store, store.init() } diff --git a/streaming-core/store/backend_test.go b/streaming-core/store/backend_test.go index 7ddcace..76347f3 100644 --- a/streaming-core/store/backend_test.go +++ b/streaming-core/store/backend_test.go @@ -11,7 +11,7 @@ func tempFSBackend(checkpointsNumRetained int) (Backend, error) { if mkdirTemp, err := os.MkdirTemp("", ""); err != nil { return nil, err } else { - return NewFSBackend(zap.L(), mkdirTemp, checkpointsNumRetained, checkpointsNumRetained*3) + return NewFSBackend(zap.L(), mkdirTemp, checkpointsNumRetained) } } @@ -30,6 +30,6 @@ func TestFSBackendSaveAndGet(t *testing.T) { } func TestIteratorBucket(t *testing.T) { - backend, _ := NewFSBackend(zap.L(), "/Users/klein/GoLandProjects/athena/tmp", 1, 2) + backend, _ := NewFSBackend(zap.L(), "/Users/klein/GoLandProjects/athena/tmp", 1) _ = backend.Close() } diff --git a/streaming-core/stream/environment.go b/streaming-core/stream/environment.go index f5590d7..c70c885 100644 --- a/streaming-core/stream/environment.go +++ b/streaming-core/stream/environment.go @@ -231,7 +231,7 @@ func (e *Environment) Start() (err error) { //0. check stream graph and print //1. init all task - if e.storeBackend, err = store.NewFSBackend(e.logger.Named("stateBackend"), e.options.CheckpointsDir, e.options.CheckpointsNumRetained, e.options.CheckpointsNumRetained*3); err != nil { + if e.storeBackend, err = store.NewFSBackend(e.logger.Named("stateBackend"), e.options.CheckpointsDir, e.options.CheckpointsNumRetained); err != nil { return fmt.Errorf("failed to new fs store backend: %w", err) } diff --git a/streaming-operator/go.mod b/streaming-operator/go.mod index 956a158..bfa6725 100644 --- a/streaming-operator/go.mod +++ b/streaming-operator/go.mod @@ -2,5 +2,5 @@ module github.com/RuiFG/streaming/streaming-operator go 1.19 -require github.com/RuiFG/streaming/streaming-core v0.1.1 +require github.com/RuiFG/streaming/streaming-core v0.1.2