Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to use stream: true? #18

Closed
raphaelrk opened this issue Jul 19, 2022 · 157 comments
Closed

How to use stream: true? #18

raphaelrk opened this issue Jul 19, 2022 · 157 comments
Labels
fixed in v4 Issues addressed by v4

Comments

@raphaelrk
Copy link

raphaelrk commented Jul 19, 2022

I'm a bit lost as to how to actually use stream: true in this library.

Example incorrect syntax:

const res = await openai.createCompletion({
  model: "text-davinci-002",
  prompt: "Say this is a test",
  max_tokens: 6,
  temperature: 0,
  stream: true,
});

res.onmessage = (event) => {
  console.log(event.data);
}
@schnerd
Copy link
Collaborator

schnerd commented Oct 20, 2022

Unfortunately streaming is not currently supported by this library 😢

I'm not sure if the SDK auto-generation tool we use (openai-generator) is able to support event streams. Will have to do more research.

The python openai package does support it: https://pypi.org/project/openai/

If anyone knows of a good way to consume server-sent events in Node (that also supports POST requests), please share!

@keraf
Copy link

keraf commented Nov 22, 2022

If anyone knows of a good way to consume server-sent events in Node (that also supports POST requests), please share!

This can be done with the request method of Node's https API. You can create a request with the options you want (such as POST as a method) and then read the streamed data using the data event on the response. You can also use the close event to know when the request has finished.

@schnerd
Copy link
Collaborator

schnerd commented Nov 30, 2022

Thanks @keraf, we'll try to look into getting this working soon.

@smervs
Copy link

smervs commented Dec 1, 2022

You can use axios stream response type. But you still need to parse the returned data.

const res = await openai.createCompletion({
  model: "text-davinci-002",
  prompt: "Say this is a test",
  max_tokens: 6,
  temperature: 0,
  stream: true,
}, { responseType: 'stream' });

res.on('data', console.log)

@LasseSander
Copy link

LasseSander commented Dec 4, 2022

Thanks! @smervs currently getting: Property 'on' does not exist on type 'AxiosResponse<CreateCompletionResponse, any>' when trying though - have you had any luck?

@smervs
Copy link

smervs commented Dec 4, 2022

Thanks! @smervs currently getting: Property 'on' does not exist on type 'AxiosResponse<CreateCompletionResponse, any>' when trying though - have you had any luck?

can you try this?

res.data.on('data', console.log)

@mattgabor
Copy link

@smervs your code is working for me, but it logs as

<Buffer 64 61 74 61 3a 20 7b 22 69 64 22 3a 20 22 63 6d 70 6c 2d 36 4a 6e 56 35 4d 70 4d 41 44 4f 41 61 56 74 50 64 30 56 50 72 45 42 4f 62 34 48 54 6c 22 2c ... 155 more bytes>

Do you know how to parse this response?

@smervs
Copy link

smervs commented Dec 5, 2022

@smervs your code is working for me, but it logs as

<Buffer 64 61 74 61 3a 20 7b 22 69 64 22 3a 20 22 63 6d 70 6c 2d 36 4a 6e 56 35 4d 70 4d 41 44 4f 41 61 56 74 50 64 30 56 50 72 45 42 4f 62 34 48 54 6c 22 2c ... 155 more bytes>

Do you know how to parse this response?

here

res.data.on('data', data => console.log(data.toString()))

@brianfoody
Copy link

brianfoody commented Dec 13, 2022

This format still waits and gives you the entire response at the end though no? Is there not a way to get the results as they stream back as per the OpenAI frontend?

@Awendel
Copy link

Awendel commented Dec 14, 2022

I second this, streaming experience is currently not good and only seems to return all chunks in bulk instead of as they come in.

This is especially problematic with large responses, where it takes a long time to finish - a much better user experience would be to show early tokens as they come in - really just being able to match Playground UX.

A pure HTTP example using request / curl would also be fine for now, would be happy to create a higher level utility function once I see a working example

@Awendel
Copy link

Awendel commented Dec 14, 2022

I solved it using the inbuilt node http / https module:

const prompt = "Sample prompt. What's 2+2?"

const req = https.request({
	hostname:"api.openai.com",
	port:443,
	path:"/v1/completions",
	method:"POST",
	headers:{
		"Content-Type":"application/json",
		"Authorization":"Bearer "+ KEY_API  
	}
}, function(res){
	res.on('data', (chunk) => {
		console.log("BODY: "+chunk);
	});
	res.on('end', () => {
		console.log('No more data in response.');
	});
})

const body = JSON.stringify({
	model:"text-davinci-003",
	prompt:prompt,
	temperature:0.6,
	max_tokens:512,
	top_p:1.0,
	frequency_penalty:0.5,
	presence_penalty:0.7,
	stream:true
})

req.on('error', (e) => {
	console.error("problem with request:"+e.message);
		});

req.write(body)

req.end()

@FaceMr
Copy link

FaceMr commented Dec 28, 2022

java okHttpClient

BufferedSource source = response.body().source(); Buffer buffer = new Buffer(); StringBuilder result = new StringBuilder(); while (!source.exhausted()) { long count = response.body().source().read(buffer, 8192); // handle data in buffer. String r = buffer.readUtf8(); log.info("result:" + r); result.append(r); buffer.clear(); }

result eg :
(非常多的这样的数据)
data: {"id": "cmpl-xxxx", "object": "text_completion", "created": 1672230176, "choices": [{"text": "\u672f", "index": 0, "logprobs": null, "finish_reason": null}], "model": "text-davinci-003"}
data: {"id": "cmpl-xxxx", "object": "text_completion", "created": 1672230176, "choices": [{"text": "\uff1a", "index": 0, "logprobs": null, "finish_reason": null}], "model": "text-davinci-003"}

@Awendel
Copy link

Awendel commented Dec 28, 2022

Yes I also found this strange, sometimes the OpenAI API returns multiple segments of
data: {}
that are not comma seperated and hence hard to parse as JSON
What I did:
string replace all "data: {" with ", {" instead of the first occurence (there just use "{")

then it can be parsed via JSON.parse, and one can extract all the text parts via .choices[0].text

@ghost
Copy link

ghost commented Dec 29, 2022

In my use case streams is more useful for the request data though, so that you can concatenate the results from different requests.

There is no dependency here apart from dotenv.

This is for the response anyways. Uses fetch which is now built into node v19 (and prev. versions using experimental flags)

See code
import * as dotenv from 'dotenv';

// I just used a story as a string with backticks
import { text } from './string.mjs';
dotenv.config();

const apiUrl = 'https://api.openai.com/v1/completions';
const apiKey = process.env.OPENAI_API_KEY;

const fetchOptions = {
  method: 'POST',
  headers: {
    Accept: 'application/json',
    'Content-Type': 'application/json',
    Authorization: `Bearer ${apiKey}`,
  },
  body: JSON.stringify({
    model: 'text-davinci-003',
    //queues the model to return a summary, works fine.
    prompt: `Full Text: ${text}
         Summary:`,
    temperature: 0,
    max_tokens: 1000,
    presence_penalty: 0.0,
    stream: true,
    //    stop: ['\n'],
  }),
};

fetch(apiUrl, fetchOptions).then(async (response) => {
  const r = response.body;
  if (!r) throw new Error('No response body');
   
  const d = new TextDecoder('utf8');
  const reader = await r.getReader();
  let fullText = ''
  while (true) {
    const { value, done } = await reader.read();
    if (done) {
      console.log('done');
      break;
    } else {
      const decodedString = d.decode(value);
      console.log(decodedString);
      try {
        //fixes string not json-parseable otherwise
        fullText += JSON.parse(decodedString.slice(6)).choices[0].text;
      } catch (e) {
        // the last line is data: [DONE] which is not parseable either, so we catch that.
        console.log(
          e, '\n\n\n\n'
          'But parsed string is below\n\n\n\n',
        );
        console.log(fullText);
      }
    }
  }
});

Also simplest code without any library:

See code
/* eslint-disable camelcase */
import * as dotenv from 'dotenv';

import { text } from './string.mjs';

//populates `process.env` with .env variables
dotenv.config();

const apiUrl = 'https://api.openai.com/v1/completions';
const apiKey = process.env.OPENAI_API_KEY;

const fetchOptions = {
  method: 'POST',
  headers: {
    Accept: 'application/json',
    'Content-Type': 'application/json',
    Authorization: `Bearer ${apiKey}`,
  },
  body: JSON.stringify({
    model: 'text-davinci-003',
    prompt: `Full Text: ${text}
         Summary:`,
    temperature: 0,
    max_tokens: 1000,
    presence_penalty: 0.0,
    //    stream: true,
    //    stop: ['\n'],
  }),
};

fetch(apiUrl, fetchOptions).then(async (response) => {
  const r = await response.json();
  console.log(r);
});

@gfortaine
Copy link

gfortaine commented Jan 3, 2023

Many thanks for this very insightful discussion 👍

As a side note, it looks like that one could consume Server-Sent Events in Node and at the same supports POST requests (even if it is not spec compliant given that only GET requests should be allowed) cc @schnerd :

@microsoft/fetch-event-source

launchdarkly-eventsource

However, it appears that we would lose all the benefits of SDK auto-generation tool. Moreover, it seems that the only TS generator supporting stream at the time of writing is the axios one (typescript-fetch doesn’t expose a method to consume the body as stream).

Hence, @smervs' answer is perfectly valid and should be the accepted one. However, we could enhance it, especially regarding the parser because a few options exist. By example, if we take the one from a customized @microsoft/fetch-event-source (note : the package has been specially retrofitted for the purpose by exporting ./parse), here is the result :

http://www.github.com/gfortaine/fortbot

import { Configuration, OpenAIApi } from "openai";
import * as parse from "@fortaine/fetch-event-source/parse";

const configuration = new Configuration({
  apiKey: process.env.OPENAI_API_KEY,
});
const openai = new OpenAIApi(configuration);

const prompt = "Hello world";
// https://help.openai.com/en/articles/4936856-what-are-tokens-and-how-to-count-them
const max_tokens = 4097 - prompt.length;

const completion = await openai.createCompletion(
  {
    model: "text-davinci-003",
    max_tokens,
    prompt,
    stream: true,
  },
  { responseType: "stream" }
);

completion.data.on(
  "data",
  parse.getLines(
    parse.getMessages((event) => {
      const { data } = event;

      // https://beta.openai.com/docs/api-reference/completions/create#completions/create-stream
      if (data === "[DONE]") {
        process.stdout.write("\n");
        return;
      }

      const { text } = JSON.parse(data).choices[0];
      process.stdout.write(text);
    })
  )
);

@schnerd
Copy link
Collaborator

schnerd commented Jan 3, 2023

@gfortaine we actually use @microsoft/fetch-event-source for the playground to do streaming with POST 👍

Thank you all for sharing your solutions here! I agree that @smervs solution currently looks like the best option available for the openai-node package. Here's a more complete example with proper error handling and no extra dependencies:

try {
    const res = await openai.createCompletion({
        model: "text-davinci-002",
        prompt: "It was the best of times",
        max_tokens: 100,
        temperature: 0,
        stream: true,
    }, { responseType: 'stream' });
    
    res.data.on('data', data => {
        const lines = data.toString().split('\n').filter(line => line.trim() !== '');
        for (const line of lines) {
            const message = line.replace(/^data: /, '');
            if (message === '[DONE]') {
                return; // Stream finished
            }
            try {
                const parsed = JSON.parse(message);
                console.log(parsed.choices[0].text);
            } catch(error) {
                console.error('Could not JSON parse stream message', message, error);
            }
        }
    });
} catch (error) {
    if (error.response?.status) {
        console.error(error.response.status, error.message);
        error.response.data.on('data', data => {
            const message = data.toString();
            try {
                const parsed = JSON.parse(message);
                console.error('An error occurred during OpenAI request: ', parsed);
            } catch(error) {
                console.error('An error occurred during OpenAI request: ', message);
            }
        });
    } else {
        console.error('An error occurred during OpenAI request', error);
    }
}

This could probably be refactored into a streamCompletion helper function (that uses either callbacks or es6 generators to emit new messages).

Apologies there's not an easier way to do this within the SDK itself – the team will continue evaluating how to get this added natively, despite the lack of support in the current sdk generator tool we're using.

@gfortaine
Copy link

gfortaine commented Jan 4, 2023

@schnerd Please find a PR : #45, as well as an updated example. Comments are welcome 👍 :

http://www.github.com/gfortaine/fortbot

import { Configuration, OpenAIApi } from "@fortaine/openai";
import { streamCompletion } from "@fortaine/openai/stream";

const configuration = new Configuration({
  apiKey: process.env.OPENAI_API_KEY,
});
const openai = new OpenAIApi(configuration);

try {
  const completion = await openai.createCompletion(
    {
      model: "text-davinci-003",
      max_tokens: 100,
      prompt: "It was the best of times",
      stream: true,
    },
    { responseType: "stream" }
  );

  for await (const message of streamCompletion(completion.data)) {
    try {
      const parsed = JSON.parse(message);
      const { text } = parsed.choices[0];

      process.stdout.write(text);
    } catch (error) {
      console.error("Could not JSON parse stream message", message, error);
    }
  }

  process.stdout.write("\n");
} catch (error) {
  if (error.response?.status) {
    console.error(error.response.status, error.message);

    for await (const data of error.response.data) {
      const message = data.toString();

      try {
        const parsed = JSON.parse(message);

        console.error("An error occurred during OpenAI request: ", parsed);
      } catch (error) {
        console.error("An error occurred during OpenAI request: ", message);
      }
    }
  } else {
    console.error("An error occurred during OpenAI request", error);
  }
}

@gfortaine
Copy link

gfortaine commented Jan 5, 2023

@gfortaine we actually use @microsoft/fetch-event-source for the playground to do streaming with POST 👍

Thank you all for sharing your solutions here! I agree that @smervs solution currently looks like the best option available for the openai-node package. Here's a more complete example with proper error handling and no extra dependencies:

try {
    const res = await openai.createCompletion({
        model: "text-davinci-002",
        prompt: "It was the best of times",
        max_tokens: 100,
        temperature: 0,
        stream: true,
    }, { responseType: 'stream' });
    
    res.data.on('data', data => {
        const lines = data.toString().split('\n').filter(line => line.trim() !== '');
        for (const line of lines) {
            const message = line.replace(/^data: /, '');
            if (message === '[DONE]') {
                return; // Stream finished
            }
            try {
                const parsed = JSON.parse(message);
                console.log(parsed.choices[0].text);
            } catch(error) {
                console.error('Could not JSON parse stream message', message, error);
            }
        }
    });
} catch (error) {
    if (error.response?.status) {
        console.error(error.response.status, error.message);
        error.response.data.on('data', data => {
            const message = data.toString();
            try {
                const parsed = JSON.parse(message);
                console.error('An error occurred during OpenAI request: ', parsed);
            } catch(error) {
                console.error('An error occurred during OpenAI request: ', message);
            }
        });
    } else {
        console.error('An error occurred during OpenAI request', error);
    }
}

This could probably be refactored into a streamCompletion helper function (that uses either callbacks or es6 generators to emit new messages).

Apologies there's not an easier way to do this within the SDK itself – the team will continue evaluating how to get this added natively, despite the lack of support in the current sdk generator tool we're using.

@schnerd Here it is (streamCompletion helper function code inspired by this snippet, courtesy of @rauschma) 👍 :

// https://2ality.com/2018/04/async-iter-nodejs.html#generator-%231%3A-from-chunks-to-lines
async function* chunksToLines(chunksAsync) {
  let previous = "";
  for await (const chunk of chunksAsync) {
    const bufferChunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
    previous += bufferChunk;
    let eolIndex;
    while ((eolIndex = previous.indexOf("\n")) >= 0) {
      // line includes the EOL
      const line = previous.slice(0, eolIndex + 1).trimEnd();
      if (line === "data: [DONE]") break;
      if (line.startsWith("data: ")) yield line;
      previous = previous.slice(eolIndex + 1);
    }
  }
}

async function* linesToMessages(linesAsync) {
  for await (const line of linesAsync) {
    const message = line.substring("data :".length);

    yield message;
  }
}

async function* streamCompletion(data) {
  yield* linesToMessages(chunksToLines(data));
}

try {
  const completion = await openai.createCompletion(
    {
      model: "text-davinci-003",
      max_tokens: 100,
      prompt: "It was the best of times",
      stream: true,
    },
    { responseType: "stream" }
  );

  for await (const message of streamCompletion(completion.data)) {
    try {
      const parsed = JSON.parse(message);
      const { text } = parsed.choices[0];

      process.stdout.write(text);
    } catch (error) {
      console.error("Could not JSON parse stream message", message, error);
    }
  }

  process.stdout.write("\n");
} catch (error) {
  if (error.response?.status) {
    console.error(error.response.status, error.message);

    for await (const data of error.response.data) {
      const message = data.toString();

      try {
        const parsed = JSON.parse(message);

        console.error("An error occurred during OpenAI request: ", parsed);
      } catch (error) {
        console.error("An error occurred during OpenAI request: ", message);
      }
    }
  } else {
    console.error("An error occurred during OpenAI request", error);
  }
}

@blakeross
Copy link

@gfortaine This solution works great with next.js API endpoints running on localhost. But once you deploy to Vercel, streaming responses via serverless functions are prohibited by AWS Lambda. You can get around this limitation by switching to next.js' experimental new Edge runtime, but then as far as I can tell that doesn't work with axios... which your solution relies on. So I still haven't found a way to actually stream openAI responses via next.js in production. Any ideas?

@blakeross
Copy link

@gfortaine Have got it working using fetch directly instead of the openAI lib but I believe there's a bug with chunksToLine. It appears to assume that chunks will be >= 1 line, but chunks can actually be part of a line. @rauschma's original implementation addresses this.

@gtokman
Copy link

gtokman commented Jan 10, 2023

@blakeross do you have any sample code on how you got it to work with next.js and vercel? Wouldn't the lambda finish if you sent a response back to the client?

@blakeross
Copy link

@gtokman it works if you use Vercel's new Edge runtime functions

@dan-kwiat
Copy link

@gtokman @blakeross may be useful: https://github.com/dan-kwiat/openai-edge

@gfortaine
Copy link

gfortaine commented Jan 17, 2023

Here is a fetch based client fully generated from SDK auto-generation tool 🎉 cc @schnerd @santimirandarp @blakeross @gtokman @dan-kwiat : #45 (comment)

(Bonus : it is wrapped by @vercel/fetch to provide retry (429 Network Error, ...) & DNS caching)

import { createConfiguration, OpenAIApi } from "@fortaine/openai";
import { streamCompletion } from "@fortaine/openai/stream";

import dotenv from "dotenv-flow";
dotenv.config({
  node_env: process.env.APP_ENV || process.env.NODE_ENV || "development",
  silent: true,
});

const configurationOpts = {
  authMethods: {
    apiKeyAuth: {
      accessToken: process.env.OPENAI_API_KEY,
    },
  },
};

const configuration = createConfiguration(configurationOpts);

const openai = new OpenAIApi(configuration);

try {
  const completion = await openai.createCompletion({
    model: "text-davinci-003",
    prompt: "1,2,3,",
    max_tokens: 193,
    temperature: 0,
    stream: true,
  });

  for await (const message of streamCompletion(completion)) {
    try {
      const parsed = JSON.parse(message);
      const { text } = parsed.choices[0];

      process.stdout.write(text);
    } catch (error) {
      console.error("Could not JSON parse stream message", message, error);
    }
  }
  process.stdout.write("\n");
} catch (error) {
  if (error.code) {
    try {
      const parsed = JSON.parse(error.body);
      console.error("An error occurred during OpenAI request: ", parsed);
    } catch (error) {
      console.error("An error occurred during OpenAI request: ", error);
    }
  } else {
    console.error("An error occurred during OpenAI request", error);
  }
}

@shawnswed
Copy link

@gfortaine we actually use @microsoft/fetch-event-source for the playground to do streaming with POST 👍

Thank you all for sharing your solutions here! I agree that @smervs solution currently looks like the best option available for the openai-node package. Here's a more complete example with proper error handling and no extra dependencies:

try {
    const res = await openai.createCompletion({
        model: "text-davinci-002",
        prompt: "It was the best of times",
        max_tokens: 100,
        temperature: 0,
        stream: true,
    }, { responseType: 'stream' });
    
    res.data.on('data', data => {
        const lines = data.toString().split('\n').filter(line => line.trim() !== '');
        for (const line of lines) {
            const message = line.replace(/^data: /, '');
            if (message === '[DONE]') {
                return; // Stream finished
            }
            try {
                const parsed = JSON.parse(message);
                console.log(parsed.choices[0].text);
            } catch(error) {
                console.error('Could not JSON parse stream message', message, error);
            }
        }
    });
} catch (error) {
    if (error.response?.status) {
        console.error(error.response.status, error.message);
        error.response.data.on('data', data => {
            const message = data.toString();
            try {
                const parsed = JSON.parse(message);
                console.error('An error occurred during OpenAI request: ', parsed);
            } catch(error) {
                console.error('An error occurred during OpenAI request: ', message);
            }
        });
    } else {
        console.error('An error occurred during OpenAI request', error);
    }
}

This could probably be refactored into a streamCompletion helper function (that uses either callbacks or es6 generators to emit new messages).

Apologies there's not an easier way to do this within the SDK itself – the team will continue evaluating how to get this added natively, despite the lack of support in the current sdk generator tool we're using.

Hi. Thanks for the great code. It works great in straight Node.js but in React it throws a 'res.data.on is not a function error. Maybe something to do with Webpack. Any insight would be appreciated. Thanks again.

@shawnswed
Copy link

Hi everyone.@smervs solution works great with straight Node.js but in React it throws a 'res.data.on() is not a function error. Maybe something to do with Webpack. Any insight would be appreciated. Thanks again.

@DerBasler
Copy link

@shawnswed I am facing the same issue:
Property 'on' does not exist on type 'CreateCompletionResponse'
🤔 I assume that we all using "openai": "^3.1.0",
I saw the pr from @gfortaine #45 so hopefully this one will soon be in
In the mean time I will try to somehow trick ts to ignore type and try to see if it works anyway. I hope I remember to update you ^^

@shawnswed
Copy link

Thanks, DerBasler. Please keep me in the loop.

@RobertCraigie
Copy link
Collaborator

Hey @Shaykuu, it looks like you can disable buffering, does this solve your issue?

@rattrayalex
Copy link
Collaborator

rattrayalex commented Jul 8, 2023

You can use stream: true in our upcoming v4 like so:

const stream = await client.completions.create({
  prompt: 'Say this is a test',
  model: 'text-davinci-003',
  stream: true,
});

for await (const part of stream) {
  process.stdout.write(part.choices[0]?.text || '');
}

A full example is here: https://github.com/openai/openai-node/blob/v4/examples/demo.ts

We expect to add additional conveniences for working with completion streams in the near future.

If you encounter any problems, please open a new issue!

EDIT: there's a bug for this in beta.3; beta.2 or beta.4 (which should be released in a few hours) will work in most environments.

@rattrayalex rattrayalex added the fixed in v4 Issues addressed by v4 label Jul 8, 2023
@robegamesios
Copy link

i get the parts per word, is there a way to get the parts by sentence or like a set duration (e.g. every 5 seconds)? thanks

@rattrayalex
Copy link
Collaborator

@robegamesios that's a question better suited for the community forum: https://community.openai.com (though the short answer is no)

@ashubham
Copy link

ashubham commented Aug 2, 2023

Well in most cases people use the openai client on the server side and forward the stream to the client. Now they cannot use the openai-node on browser/client because the stream is actually coming from their own backend. For this, we are releasing fetch-stream-observable. Which enables this use case in a simple way.

@rattrayalex
Copy link
Collaborator

That looks potentially handy; please keep in mind that API Keys should not be used in the browser: https://help.openai.com/en/articles/5112595-best-practices-for-api-key-safety

@ManInTheWind
Copy link

You can use stream: true in our upcoming v4 like so:

const stream = await client.completions.create({
  prompt: 'Say this is a test',
  model: 'text-davinci-003',
  stream: true,
});

for await (const part of stream) {
  process.stdout.write(part.choices[0]?.text || '');
}

A full example is here: https://github.com/openai/openai-node/blob/v4/examples/demo.ts

We expect to add additional conveniences for working with completion streams in the near future.

If you encounter any problems, please open a new issue!

EDIT: there's a bug for this in beta.3; beta.2 or beta.4 (which should be released in a few hours) will work in most environments.

I feel that this stream is not good, when do I know the end of the stream?
and the error of stream.

@rattrayalex
Copy link
Collaborator

Like this:

try {
  for await (const part of stream) {
    process.stdout.write(part.choices[0]?.text || '');
  }
  console.log('The stream is over.')
} catch (err) {
  console.error('The stream had an error', err)
}

@huytool157
Copy link

How do I get the token count once the stream is complete? I don't see it on the example. The new API is much nicer btw.

@rattrayalex
Copy link
Collaborator

As mentioned earlier in the thread:

usage data is not currently available in streamed responses, we may look into adding this in the future

@4vanger
Copy link

4vanger commented Oct 15, 2023

You can use stream: true in our upcoming v4 like so:
Neat! However it is really non-trivial making it work with express.js - I hacked a solution using custom ReadableStream implementation to pipe into response but I feel that's not the easiest solution. Adding another example for streaming responses from server to client will be very helpful!

@Pedroglp
Copy link

Pedroglp commented Nov 15, 2023

Finally I was able to make it work using Sveltekit:

export const POST = (async ({ request }) => {
    const payload = await request.json();
    let streamOpenAI:Stream<OpenAI.Chat.Completions.ChatCompletionChunk>;

    try { 
        streamOpenAI = (await (openai.chat.completions.create({
            model: "gpt-4",
            messages: [{
                role: "user",
                content: `your prompt`
            }],
            stream: true,})
        ));
    } catch(e) {
        throw error(500, 'Failed creating data stream.');
    }

    let response = new ReadableStream({
        async start(controller) {
            try {
                for await (const part of streamOpenAI) {
                    controller.enqueue(part.choices[0]?.delta.content || '');
                }
                controller.close();
                return;
            } catch (err) {
                controller.close();
                throw error(500, 'Error while processing data stream.')
            }
        },
    })

    
    return new Response(response, {
        status: 200,
        headers: {
          'content-type': 'text/event-stream',
          'Connection': 'keep-alive'
        }
    });
}) satisfies RequestHandler;

@rattrayalex
Copy link
Collaborator

rattrayalex commented Nov 16, 2023

@Pedroglp you may be interested in .toReadableStream() and .fromReadableStream(), – see examples/stream-to-client-next.ts for one example

@Frankjunyulin
Copy link

Frankjunyulin commented Nov 18, 2023

@rattrayalex
Hi Alex, I followed the references you shared, but got following error:

[13:15:43.316] ERROR (58268): We are sorry, something goes wrong: Converting circular structure to JSON
    --> starting at object with constructor 'ReadableStream'
    |     property '_readableStreamController' -> object with constructor 'ReadableStreamDefaultController'
    --- property '_controlledReadableStream' closes the circle

Don't know how to figure it out, could you help?

In my backend:

export default async (
  req: NextApiRequest,
  res: NextApiResponse
): Promise<void> => {
...

  const completion = awaitopenaiApi().chat.completions.create({
    model: OPENAI_API_MODEL,
    temperature: 0,
    openaiMessages,
    stream: true,
  })
  if (completion) {
    const message = {
      content: completion.toReadableStream(),
      role: "assistant",
      createdAt: new Date().toISOString(),
    };
    /*
    await prisma.conversation.update({
      where: {id: conversation.id},
      data: {
        messages: {
          push: message,
        },
      },
    });
    */
  
    res.status(200).json({conversationId: conversation.id, message});
  } else {
    res.status(200).json({conversationId: conversation.id});
  }

In the Frontend:

const data = await response.json();
      const runner = ChatCompletionStreamingRunner.fromReadableStream(
        data.message
      );
      runner.on("content", (delta, snapshot) => {
        console.log(delta);
      });
      console.dir(await runner.finalChatCompletion(), {depth: null});
    } catch (error) {
      // Consider implementing your own error handling logic here
      logger.error(error);
      // alert(error.message);
    }

@rattrayalex
Copy link
Collaborator

@Frankjunyulin you can't send a stream within a json response body, the stream must be the entire response body. Your code should look more like this:

export default async (
  req: NextApiRequest,
  res: NextApiResponse
): Promise<void> => {
  // ...

  const stream = await openaiApi().beta.chat.completions.stream({
    model: OPENAI_API_MODEL,
    temperature: 0,
    openaiMessages,
    stream: true,
  })
  
  stream.on('message', async (message) => {
    await prisma.conversation.update({
      where: {id: conversation.id},
      data: {
        messages: {
          push: message,
        },
      },
    });
  })
  
  return Response(stream.toReadableStream()) // this is how it works in app router, not sure the exact syntax for this in pages router…
}

and on the frontend, you want more like this:

const runner = ChatCompletionStreamingRunner.fromReadableStream(
  response.body
);
runner.on("content", (delta, snapshot) => {
  console.log(delta);
});
console.dir(await runner.finalChatCompletion(), {depth: null});

@Frankjunyulin
Copy link

@rattrayalex Thank you for your response. But then how to pass other fields to frontend from backend?

We also want to pass other information, like conversationId (in our code above). How could we do that (without Json)?

@rattrayalex
Copy link
Collaborator

Probably the easiest way would actually be to add that to a header. Otherwise you'd need to DIY the entire stream backend & frontend, or do it in a separate request.

@adaboese
Copy link

adaboese commented Dec 4, 2023

This is how I stream:

let content = '';

for await (const chunk of stream) {
  content += chunk.choices[0]?.delta?.content || '';
}

I am having a problem where this results in corrupted JSON:

[
  {
    "title": "Melting Prospects: A Harrowing Look into the Future of Polar Bears"
  },
  {
    "title": "When the Ice Fades: Personal Narratives of Polar Bears' Struggles Against Climate Change"
  },
  {
    "title": "Bear Necessities: 7 Urgent Steps We Must Take to Protect Our Polar Giants"
  },
  {
 the  "title": "Up Close with the Kings of the Arctic: How Environmentalists are Battling for Polar Bear Survival"
  },
  {
    "title": "Ice Cap Casualties: The Unseen Impact of Pollution on Polar Bear Habitats – An Opinion Piece"
  }
]

Has anyone dealt with this and can advise what I am doing wrong?

@adaboese
Copy link

adaboese commented Dec 4, 2023

I should mention that this doesn't happen every time. Appears to happen sometimes.

@rattrayalex
Copy link
Collaborator

Problems related to streaming should be opened as new issues. (Please note that this repo is only for problems in the SDK, not the underlying API).

@dereckmezquita
Copy link

dereckmezquita commented Mar 20, 2024

I'm looking for how to transform/modify the stream data en route, that is modify it while receiving it and sending it through for other use. Does anyone have anything like this?

public async chat_completion(): Promise<any> {
    const result: Stream<ChatCompletionChunk> = await this.ai.chat.completions.create({
        max_tokens: this.options.max_tokens,
        model: this.options.model,
        messages: [
            {
                role: 'system',
                content: this.options.system_prompt
            },
            ...this.conversation
        ],
        stream: this.options.stream as true
    });

  let finalMessage = '';
  // transform the stream and concatenate response to finalMessage for internal use on my class

  return modifiedStream;
}

Update I was able to accomplish something like what I wanted with the Stream class.

public async chat_completion(): Promise<Stream<ChatCompletionChunk>> {
    const result: Stream<ChatCompletionChunk> = await this.ai.chat.completions.create({
        max_tokens: this.options.max_tokens,
        model: this.options.model,
        messages: [
            {
                role: 'system',
                content: this.options.system_prompt
            },
            ...this.conversation
        ],
        stream: this.options.stream as true
    });

    let message: string = '';

    const modifiedStream = new Stream(async function*() {
        console.log("Modifying the data...");
        for await (const chunk of result) {
            // Modify the chunk data here
            const modifiedChunk = {
                ...chunk,
                choices: chunk.choices.map(choice => ({
                    ...choice,
                    delta: {
                        ...choice.delta,
                        content: choice.delta.content ? choice.delta.content.toUpperCase() : undefined
                    }
                }))
            };

            // Accumulate the content in the 'message' variable
            if (modifiedChunk.choices[0].delta.content) {
                message += modifiedChunk.choices[0].delta.content;
            }

            yield modifiedChunk;
        }

        console.log('The final message is: ', message.length);
    }, result.controller);

    return modifiedStream;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
fixed in v4 Issues addressed by v4
Projects
None yet
Development

No branches or pull requests