PipedInputStream and PipedOutputStream gotchas

2020-07-18 (last modified 2020-07-19)


To do I/O in Java, you read from InputStreams and write to OutputStreams. Sometimes you want to connect the output of an OutputStream to the input of an InputStream. In my case, I wanted to upload a CSV (generated by writing to a given OutputStream) to Amazon S3 (using a class reading from an InputStream).

I solved this problem using a PipedInputStream and PipedOutputStream. Reading from a PipedInputStream returns bytes written to the connected PipedOutputStream. Here was the Kotlin code I ended up with:

// Create a PipedInputStream; automatically close it at the end of
// the lambda
PipedInputStream().use { inputStream ->

  // Run the given lambda in a new thread
  thread {

    // Create a PipedOutputStream and connect it to the
    // PipedInputStream; automatically close it at the end of the
    // lambda
    PipedOutputStream(inputStream).use { outputStream ->
      generateCsv(outputStream)
    }
  }

  uploadToS3(inputStream)
}

These classes come with a couple of gotchas. First, you should write to the PipedOutputStream and read from the PipedInputStream on separate threads. (Each class’s documentation clearly states this, so this isn’t so much a gotcha as a case of me not reading the docs closely enough.) If you read and write on the same thread, you might encounter a deadlock.

The deadlock’s symptoms depend on whether your single-threaded code reads before it writes or vice versa. If it reads from the PipedInputStream first, it’ll hang forever waiting for something to write to the PipedOutputStream. However, if it writes to the PipedOutputStream first, you might not see any symptoms initially. In my case, I wrote a passing test that generated a small CSV and uploaded it to an S3 mock. However, when the code ran in production, it generated part of the CSV, then looped forever.

The problem is buffering. PipedInputStreams buffer 1,024 bytes by default. Like I did, you might also wrap the former in a BufferedReader or the latter in a BufferedWriter, which default to using 8,192-byte buffers. As long as the data you write fit into the largest buffer in the stream, you won’t notice any problems. Writing won’t block because the buffer can hold all the written bytes. Then the PipedInputStream can read all the data you wrote. But as soon as you write enough data to fill the buffer, your write will block and you’ll encounter a deadlock.

As the code above demonstrates, the solution is to read on one thread and write on another. That way, reading and writing can block without preventing the other operation from making progress.

After realizing this, I encountered the second gotcha. My code initially called generateCsv on the main thread and uploadToS3 on the thread created by thread. The issue is that, when generateCsv finishes, the main thread returns from the use block associated with the PipedInputStream. This closes the PipedInputStream even if uploadToS3 is in the middle of reading from it. In my case, I noticed that the CSV file written to the S3 mock was truncated.

To solve this problem, read on the main thread and write on the newly-created thread. This way, the PipedInputStream doesn’t close until uploadToS3 has read everything from it.

It took me a few hours to figure out what are in retrospect a couple of simple mistakes. My main takeaway is that I need to read documentation more carefully. I should also spend more time getting to know Java’s I/O primitives better.