import com.alibaba.nacos.common.utils.JacksonUtils;
import com.theokanning.openai.completion.chat.ChatCompletionChunk;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.FluxSink;
@Slf4j
public class OpenAiSubscriber implements Subscriber<String>, Disposable {
private final FluxSink<String> emitter;
private final StringBuilder stringBuilder;
private Subscription subscription;
public OpenAiSubscriber(FluxSink<String> emitter) {
this.emitter = emitter;
this.stringBuilder = new StringBuilder();
}
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
subscription.request(1);
}
@Override
public void onNext(String data) {
log.info("OpenAI返回数据:{}", data);
if ("[DONE]".equals(data)) {
log.info("OpenAI返回数据结束了");
subscription.request(1);
emitter.complete();
} else {
ChatCompletionChunk response = JacksonUtils.toObj(data, ChatCompletionChunk.class);
String content = response.getChoices().get(0).getMessage().getContent();
content = content == null ? "" : content;
emitter.next(content);
stringBuilder.append(content);
subscription.request(1);
}
}
@Override
public void onError(Throwable t) {
log.error("OpenAI返回数据异常:{}", t.getMessage());
emitter.error(t);
}
@Override
public void onComplete() {
log.info("OpenAI返回数据完成");
emitter.complete();
}
@Override
public void dispose() {
log.warn("OpenAI返回数据关闭");
emitter.complete();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64