Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CRT engine fails to read objects larger than the window size with localstack #508

Closed
brizzbuzz opened this issue Jan 25, 2022 · 16 comments
Closed
Labels
bug This issue is a bug. closed-for-staleness

Comments

@brizzbuzz
Copy link

brizzbuzz commented Jan 25, 2022

Describe the bug

Hey, this is strange, and is making me worry that I'm just doing something really dumb, but given that this SDK is in beta perhaps there is some genuine weirdness going on here.

I am trying to set up a simple read/write S3 service in spring-boot using the kotlin aws sdk, and it seems like there is some really odd truncating going on with the ByteStream class, but only when the ByteStream comes from a downloaded object.

I have two very simple tests

       describe("S3 File Upload Service") {
          it("can upload and download a file from S3") {
              // arrange
              val fileContent = javaClass.classLoader.getResource("all_star.txt")?.readText()
                  ?: error("Where's ma txt file 😡")
              val fileKey = "swamp"

              // act
              fileStorageService.uploadFile(fileKey, fileContent.encodeToByteArray().inputStream())
              val result = fileStorageService.downloadFile(fileKey)

              // assert
              result.contentLength!! shouldBeExactly ByteStream.fromString(result.decodeToString()).contentLength!!
          }
          it("Pure bytestream test") {
              val fileContent = javaClass.classLoader.getResource("all_star.txt")?.readText()
                  ?: error("Where's ma txt file 😡")
              val bs = ByteStream.fromString(fileContent)

              bs.contentLength!! shouldBeExactly ByteStream.fromString(bs.decodeToString()).contentLength!!
          }
      }

where fileStorageService implements a very minimal interface

interface IFileStorageService {
    suspend fun uploadFile(key: String, stream: InputStream): Boolean
    suspend fun downloadFile(key: String): ByteStream
}

The weirdness is that in the first test, result.contentLength!! shouldBeExactly ByteStream.fromString(result.decodeToString()).contentLength!! fails, while in the second test, it succeeds (as expected).

In the first test, I get an error

79823 should be equal to 16384
java.lang.AssertionError: 79823 should be equal to 16384
	at my.service.file.storage.S3FileStorageServiceIT$3$1.invokeSuspend(S3FileStorageServiceIT.kt:55)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.internal.ScopeCoroutine.afterResume(Scopes.kt:33)
	at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:102)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
	at kotlinx.coroutines.UndispatchedCoroutine.afterResume(CoroutineContext.kt:142)
	at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:102)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Expected behavior

Decoding a byte stream and then encoding back to a byte stream should result in, if not completely identical streams, then at least streams with identical content lengths.

Current behavior

It seems to work, except in the case that a bytestream has been pulled from a getObject request

Steps to Reproduce

Pretty simple, upload a file to S3 (in my case I'm using localstack to try all of this locally).

The file I'm using to test is the full transcript of Shrek, link here. This isn't a hard requirement for reproducing, but at the same time... it totally is.

Then just try to upload it, download it, and compare the decoded bytestream to the expected body.

Possible Solution

No real idea, I'm wondering if there is any possibility that it has something to do with using localstack? but I would really rather not provision an actual bucket just to compare against my failed local testing.

Context

I'm just a simple man, trying to write to his bucket and read it back.

AWS Kotlin SDK version used

0.11.0-beta

Platform (JVM/JS/Native)

JVM

Operating System and version

Mac OS Big Sur

@brizzbuzz brizzbuzz added bug This issue is a bug. needs-triage This issue or PR still needs to be triaged. labels Jan 25, 2022
@aajtodd
Copy link
Collaborator

aajtodd commented Jan 25, 2022

@unredundant Thanks for the detailed report.

I'm not able to recreate this locally.

    val fileText = javaClass.classLoader.getResource("aws/sdk/kotlin/example/shrek.txt")?.readText() ?: error("expected shrek script")
    val fileContent = fileText.encodeToByteArray()
    val fileKey = "swamp"


    S3Client.fromEnvironment { region = "us-east-2" }.use { s3 ->
        println("original content length: ${fileContent.size}")
        val putResp = s3.putObject {
            bucket = "<my-bucket>"
            key = fileKey
            body = ByteStream.fromBytes(fileContent)
        }

        println(putResp)


        s3.getObject(GetObjectRequest { bucket = "aaron22-throwaway"; key = fileKey }) { resp ->
            println("content length: ${resp.body?.contentLength}")

            val bs = ByteStream.fromString(resp.body!!.decodeToString())
            println("reconstituted content length: ${bs.contentLength}")
        }

    }

All of these print out a content length of 81645 which matches what is on disk.

I'd be interested to see what the implementation of IFileStorageService looks like. I suspect there is an issue there.

Also couple friendly pointers:

  1. It looks like you are returning a ByteStream from downloadFile. How are you managing this lifetime? The GetObjectResponse is only valid until the end of the closure. See these docs for more info. I suspect the connection is being closed and truncating your result.
  2. The ByteStream type has extensions for creating or consuming from ByteArray, Files and Paths in addition to String. You have several intermediate conversions to and from string that aren't necessary (see same docs here)
  3. You may not necessarily want to use an InputStream since it is a blocking call to consume data from. ByteStream provides abstractions for consuming data in a coroutine friendly manner. In particular it handles files out of the box using the correct dispatcher and without you having to buffer the whole thing in memory.

@aajtodd aajtodd added response-requested Waiting on additional info and feedback. Will move to 'closing-soon' in 5 days. and removed needs-triage This issue or PR still needs to be triaged. labels Jan 25, 2022
@brizzbuzz
Copy link
Author

Thanks for the quick response, really appreciate it :)

I suspect the connection is being closed and truncating your result.

Hmmm, so here is my current quick and dirty implementation that is causing problems.

@Service
class S3FileStorageService @Autowired constructor(val client: S3Client) : IFileStorageService {

    // todo encryption?
    override suspend fun uploadFile(key: String, stream: InputStream): Boolean {
        val body = ByteStream.fromBytes(stream.readBytes())
        val result = client.putObject {
            this.bucket = "testerino"
            this.key = key
            this.body = body
        }
        return true
    }

    override suspend fun downloadFile(key: String): ByteStream = client.getObject(GetObjectRequest.invoke {
        this.bucket = "testerino"
        this.key = key
    }) {
        it.body!!
    }
}

I've tried a couple of misc modifications, like switching the return type of downloadFile to ByteArray and instead doing it.body!!.toByteArray() but nothing seems to do the trick :(

Screen Shot 2022-01-25 at 10 52 33 AM

What's odd is that, I would assume that if my response was being closed mid stream, that I would get non-deterministic results, but I get this exact character count on every run 🤔

You have several intermediate conversions to and from string that aren't necessary

Yep, I pretty much only have these cuz I was going crazy trying to figure out exactly where this truncation is happening :') definitely room to streamline, but none of them should really be causing this truncation correct?

You may not necessarily want to use an InputStream since it is a blocking call to consume data from

Good call! I copy pasta'd that interface from an old synchronous impl. will definitely change that

@brizzbuzz
Copy link
Author

I might try this against an actual s3 bucket, I'm wondering if there is some possible localstack weirdness going on?

@aajtodd
Copy link
Collaborator

aajtodd commented Jan 25, 2022

What's odd is that, I would assume that if my response was being closed mid stream, that I would get non-deterministic results

Agreed that is odd if that is what is happening, although you could always add a delay somewhere and see if your results change a bit.

    override suspend fun downloadFile(key: String): ByteStream = client.getObject(GetObjectRequest.invoke {
        this.bucket = "testerino"
        this.key = key
    }) {
        it.body!!
    }

Either way this is definitely an issue. The response body is only valid inside the given closure and so any attempt to read it out of the closure is invalid. You said you tried it.body!!.toByteArray(), that should work (and does for me locally) since it will consume the entire stream before leaving the closure.

I'm wondering if there is some possible localstack weirdness going on?

It wouldn't hurt to rule it out.

Let us know what you find, very interested to understand what is going on here.

@brizzbuzz
Copy link
Author

So, I'm still waiting on company SRE to get me a real bucket with perms to test against, but in the meantime, I went ahead and cut out all of this code into a demo repo. I've confirmed that I get the same error as in my actual work repo. https://github.com/unredundant/kt-aws-s3-bug-demo

I wanted to try it on my personal machine, but I have an m1 mac and I was experiencing the same error as this report #473 so I figured I'd just push it :)

Would be really curious to hear if you get the same result in the repo i posted

@github-actions github-actions bot removed the response-requested Waiting on additional info and feedback. Will move to 'closing-soon' in 5 days. label Jan 25, 2022
@brizzbuzz
Copy link
Author

Ok, I haven't cracked the code yet, but a little debugging has led me down an interesting path.

The content length of the file is 79823. However, by the time that the SDK tries to run readAsMuchAsPossible inside the AbstractBufferedReadChannel, for some reason, the availableForRead field has gotten set to 16384. This explains at least how on every single read, I get the exact same snippet of the text file.

So i don't think the problem is that the connection is closing ahead of the read, but rather that something is causing availableForRead to not understand that there is in fact more to read.

@brizzbuzz
Copy link
Author

Screen Shot 2022-01-26 at 9 25 53 AM

@aajtodd
Copy link
Collaborator

aajtodd commented Jan 26, 2022

Ok, I haven't cracked the code yet, but a little debugging has led me down an interesting path.

The content length of the file is 79823. However, by the time that the SDK tries to run readAsMuchAsPossible inside the AbstractBufferedReadChannel, for some reason, the availableForRead field has gotten set to 16384. This explains at least how on every single read, I get the exact same snippet of the text file.

So i don't think the problem is that the connection is closing ahead of the read, but rather that something is causing availableForRead to not understand that there is in fact more to read.

Interesting. So 16384 is the default window size of the underlying BufferedReadChannel used by the CRT HTTP engine. This is the amount of data that it will read from the socket and then pause and wait for the consumer to start draining it before proceeding to read more (to keep from buffering too much in memory). That explains why it's the same every time at least.

availableForRead is just the amount that is immediately available for read without suspending. I'd be interested to know what isClosedForRead and isClosedForWrite are. The CRT buffered read channel implementation is here, specifically the readRemaining function (which should drain availableForRead and then suspend until up to the limit has been read, which in this case should be draining the entire thing).

I'll try and take a look at this as well and see if I can reproduce.

@brizzbuzz
Copy link
Author

So, it looks like isClosedForRead flips from false to true between readAsMuchAsPossible and readRemainingSuspend.

Screen Shot 2022-01-26 at 10 56 16 AM


Screen Shot 2022-01-26 at 10 56 28 AM

And then looking at the available segments, _next is of type Closed

Screen Shot 2022-01-26 at 11 00 58 AM


Also just another weird little thing I noticed, you guys have a check in place

private suspend fun readRemainingSuspend(buffer: SdkByteBuffer, limit: Int): ByteArray {
  check(currSegment.value == null) { "current segment should be drained already" }
  // ...

that references the field private val currSegment: AtomicRef<Segment?> = atomic(null). I'm not familiar with this AtomicFU library, but the strange thing is that currSegment itself is null

Screen Shot 2022-01-26 at 11 07 19 AM

which is strange, because currsegment itself isn't nullable... if it were wouldn't the definition need to be private val currSegment: AtomicRef<Segment?>?. Or does atomicfu somehow make null checks permeate from the inner type?

@aajtodd
Copy link
Collaborator

aajtodd commented Jan 26, 2022

which is strange, because currsegment itself isn't nullable... if it were wouldn't the definition need to be private val currSegment: AtomicRef<Segment?>?. Or does atomicfu somehow make null checks permeate from the inner type?

I'm not sure what the debugger is doing but the code itself is checking the inner value of the AtomicRef. atomicfu is (more or less) just a KMP implementation of the Java Atomic* API's.


I was able to reproduce with your example repo, thanks for such a detailed and easy reproduction.

OK starting to get a better picture of what is happening with some trace logging of the CRT engine.

[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [http-stream] - id=0x7fc2b9933db0: Flow-control window has reached 0. No more data can be received until window is updated.                    
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [task-scheduler] - id=0x7fc2b9f83cd0: Scheduling window update task task for immediate execution                                               
[INFO] [2022-01-26T16:41:28Z] [0000700004bd4000] [socket] - id=0x7fc2b9c97f40 fd=198: zero read, socket is closed                                                                                
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [task-scheduler] - id=0x7fc2b9f83c48: Scheduling channel_shutdown task for immediate execution                                                 
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [task-scheduler] - id=0x7fc2b9f83cd0: Running window update task task with <Running> status                                                    
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [task-scheduler] - id=0x7fc2b9f83d78: Scheduling socket_handler_read_on_window_increment task for immediate execution                          
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [task-scheduler] - id=0x7fc2b9f83c48: Running channel_shutdown task with <Running> status                                                      
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [channel] - id=0x7fc2b9f83a30: beginning shutdown process                                                                                      
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [channel] - id=0x7fc2b9f83a30: handler 0x7fc2b9f83d40 shutdown in read dir completed.                                                          
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [channel] - id=0x7fc2b9f83a30: handler 0x7fc2b9f83e78 shutdown in read dir completed.                                                          
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [task-scheduler] - id=0x7fc2b9f83a58: Scheduling (null) task for immediate execution                                                           
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [task-scheduler] - id=0x7fc2b9f83d78: Running socket_handler_read_on_window_increment task with <Running> status                               
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [task-scheduler] - id=0x7fc2b9f83a58: Running (null) task with <Running> status                                                                
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [http-stream] - id=0x7fc2b9933db0: Stream completed with error code 1051 (AWS_IO_SOCKET_CLOSED).                                               
[INFO] [2022-01-26T16:41:28Z] [0000700004bd4000] [http-connection] - id=0x7fc2b9f83e70: Shutting down connection with error code 0 (AWS_ERROR_SUCCESS).                                          
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [channel] - id=0x7fc2b9f83a30: Channel shutdown is already pending, not scheduling another.
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [channel] - id=0x7fc2b9f83a30: handler 0x7fc2b9f83e78 shutdown in write dir completed.                                                         
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [socket] - id=0x7fc2b9c97f40 fd=198: closing
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [task-scheduler] - id=0x7fc2b9f83740: Scheduling kqueue_event_loop_clean_up_handle_data task for immediate execution
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [task-scheduler] - id=0x7fc2b9f83de0: Scheduling socket_handler_close task for immediate execution
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [task-scheduler] - id=0x7fc2b9f83740: Running kqueue_event_loop_clean_up_handle_data task with <Running> status                                
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [task-scheduler] - id=0x7fc2b9f83de0: Running socket_handler_close task with <Running> status
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [channel] - id=0x7fc2b9f83a30: handler 0x7fc2b9f83d40 shutdown in write dir completed.
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [task-scheduler] - id=0x7fc2b9f83a58: Scheduling (null) task for immediate execution
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [task-scheduler] - id=0x7fc2b9f83a58: Running (null) task with <Running> status
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [channel-bootstrap] - id=0x7fc2be235a50: channel 0x7fc2b9f83a30 shutdown with error 1051.                                                      
[INFO] [2022-01-26T16:41:28Z] [0000700004bd4000] [http-connection] - 0x7fc2b9f83e70: Client shutdown completed with error 1051 (AWS_IO_SOCKET_CLOSED).
[DEBUG] [2022-01-26T16:41:28Z] [0000700004bd4000] [connection-manager] - id=0x7fc2bd1aa460: shutdown received for connection (id=0x7fc2b9f83e70)             

It looks like what is happening is the window fills up and then the socket is closed for read.

I believe we have a small bug where this error is not getting propagated correctly here. I believe this ought to be passing the cause in such that it reads segments.close(cause) (I'll have to dive into it a bit to be sure if thats correct but on first read it looks like a miss to me).

Indeed when I do pass the underlying cause it propagates up to the test with an exception:

aws.sdk.kotlin.runtime.ClientException: CrtHttpEngine::response failed: ec=1051; description=socket is closed.           

As to why this is happening I'm not sure yet but it may be with localstack. I'd be interested if you're able to test this on a real S3 bucket and still see the same thing.

@aajtodd
Copy link
Collaborator

aajtodd commented Jan 26, 2022

I don't see this behavior with the KtorEngine (based on okhttp) which means it may indeed be an issue with the CRT HTTP client somewhere. You can try and temporarily work around this in your tests by using a different HTTP client engine:

diff --git a/file-storage/build.gradle.kts b/file-storage/build.gradle.kts
index 209ec10..7dafd6e 100644
--- a/file-storage/build.gradle.kts
+++ b/file-storage/build.gradle.kts
@@ -68,7 +74,9 @@ testing {
 
                 // AWS
                 implementation("com.amazonaws:aws-java-sdk-core:1.12.145") // Used ONLY for localstack credential provider
+                implementation("aws.smithy.kotlin:http-client-engine-ktor:0.7.7-SNAPSHOT")
+                implementation("com.squareup.okhttp3:okhttp:4.9.2")
 
                 // Mockk
                 implementation("io.mockk:mockk:1.12.1")
diff --git a/file-storage/src/testIntegration/kotlin/testerino/S3ClientConfig.kt b/file-storage/src/testIntegration/kotlin/testerino/S3ClientConfig.kt
index 79d3271..fb3e776 100644
--- a/file-storage/src/testIntegration/kotlin/testerino/S3ClientConfig.kt
+++ b/file-storage/src/testIntegration/kotlin/testerino/S3ClientConfig.kt
@@ -9,9 +9,14 @@ import org.springframework.context.annotation.Configuration
 import org.springframework.context.annotation.Profile
 import org.testcontainers.containers.localstack.LocalStackContainer
 
+import aws.smithy.kotlin.runtime.http.engine.ktor.KtorEngine
+
 @Profile("integration-test")
 @Configuration
 class S3ClientConfig {
 
     @Autowired
     private lateinit var localstack: LocalStackContainer
@@ -19,6 +24,7 @@ class S3ClientConfig {
     @Bean
     fun s3Client(): S3Client = S3Client {
         region = "us-east-1"
+        httpClientEngine = KtorEngine()
         endpointResolver = AwsEndpointResolver { _, _ ->
             AwsEndpoint(
                 localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()

The explicit dependency on com.squareup.okhttp3:okhttp:4.9.2 shouldn't be necessary but I hit a runtime issue and found a dependency conflict from somewhere causing the resolved version to be 3.x.

Full disclosure, the KtorEngine hasn't seen much use lately. We have plans to ensure it and any other future HTTP engine is well maintained and tested against a common test suite but we haven't gotten to it yet. I expect it works just fine still but as an FYI.

Hopefully this can unblock you. I'll leave it open for now to see if anyone else has similar issues working with localstack. I'm still interested if you see this on a real bucket though so please let us know if you see issues there.

@aajtodd aajtodd changed the title ByteStream loses data on decodeToString _only_ when retrieved from getObject CRT engine fails to read objects larger than the window size with localstack Jan 26, 2022
@brizzbuzz
Copy link
Author

Can confirm that the workaround you posted works for me. Only caveat is I had to use aws.smithy.kotlin:http-client-engine-ktor:0.7.6 as I'm not sure where the snapshot jars get published 😅

I will follow up with info on the real bucket attempt soon.

Really appreciate the help! Very excited to see this SDK moving towards stable :)

@brizzbuzz
Copy link
Author

I was able to run this against a live s3 bucket without the Ktor engine override, so this does seem to be localstack specific

@aajtodd
Copy link
Collaborator

aajtodd commented Jan 26, 2022

Thanks for confirming. I'll open a ticket with the CRT team to see if they can track down anything but since it's specific to localstack no further action will be taken on this right now.

@vcanuel
Copy link

vcanuel commented Feb 3, 2022

Hi,

I ran into the same issue on localstack. I had to switch to KtorEngine as suggested.

My code is fairly simple :

suspend fun downloadToFile(keyName: String, bucketName: String? = null): File? {
        val request = GetObjectRequest {
            key = keyName
            bucket = bucketName ?: defaultBucket
        }
        return try {
            amazonS3Client.getObject(request) { resp ->
                if (resp.body != null) {
                    val file = kotlin.io.path.createTempFile()
                    resp.body?.writeToFile(file)
                    file.toFile()
                } else {
                    null
                }
            }
        } catch (e: Exception) {
            null
        }
    }

Calling this function from a spring boot controller results in a corrupted temp file (size was totally random).

@github-actions
Copy link

github-actions bot commented Feb 6, 2023

This is a very old issue that is probably not getting as much attention as it deserves. We encourage you to check if this is still an issue in the latest release and if you find that this is still a problem, please feel free to provide a comment or open a new issue.

@github-actions github-actions bot added closing-soon This issue will automatically close in 2 days unless further comments are made. closed-for-staleness and removed closing-soon This issue will automatically close in 2 days unless further comments are made. labels Feb 6, 2023
@github-actions github-actions bot closed this as completed Feb 8, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug This issue is a bug. closed-for-staleness
Projects
None yet
Development

No branches or pull requests

3 participants