
Building a Fully Functional AI Assistant with OpenAI (Series)
The BaseAssistant
consolidates repeated patterns (thread management, handling tool calls, file uploads, etc.) so each specialized assistant can focus on domain-specific logic. For the full file, see our GitHub source. This is a generic version—just adapt or remove the placeholders to suit your domain.
Step-by-Step
export abstract class BaseAssistant extends Assistant {
protected http: AxiosInstance;
protected openai: OpenAiService;
protected storageService: IStorageService; // example file storage interface
protected queueService: QueueService; // for background tasks
protected logger: Logger;
protected cache: Redis;
protected someRepository: GenericRepository; // domain-agnostic example
constructor(protected container: ServiceProvider) {
super();
container.initialize(this); // sets all dependencies on 'this'
this.logger = this.logger.child({ class: BaseAssistant.name });
this.http = axios.create({ headers: { timeout: 10000 } });
}
// ...
}
I inject a ServiceProvider
that sets up the dependencies (OpenAiService, queue, cache, etc.). Then I configure Axios for any external HTTP calls. Everything else is domain-agnostic.
async chat(ctx: ChatContext): Promise<ChatResult> {
// 1) Possibly handle file uploads
await this.handleFiles(ctx);
// 2) Create or reuse a thread
const threadId = ctx.input.threadId || (await this.openai.assistant.createThread()).id;
// 3) Add the user's message to the thread
if (ctx.input.content) {
await this.openai.assistant.addMessage(threadId, ctx.input.content, {}, 'user');
}
// 4) If skipRun is true, just return messages as is
if (ctx.input.skipRun) {
const messages = await this.openai.assistant.getMessages(threadId);
return { threadId, messages, assistantId: this.id, runId: 'skipped' };
}
// 5) Check for an active run; if found, handle it
let run = await this.openai.assistant.getActiveRun(threadId);
if (run) {
return this.handleRun({ ...ctx, threadId, run, messages: [] });
}
// 6) Otherwise, start a new run
run = await this.openai.assistant.runAndPoll(threadId, this.id, ctx.toolChoice, ctx.additionalInstructions);
// 7) After the run, handle it (it may require tool calls)
const messagesAfterRun = await this.openai.assistant.getMessages(threadId);
const chatResult = await this.handleRun({
...ctx,
threadId,
run,
messages: messagesAfterRun,
});
// 8) Optionally post-process messages
const finalMessages = await this.postProcessMessages({
user: ctx.user,
threadId,
messages: chatResult.messages,
});
// 9) Return final ChatResult
chatResult.messages = finalMessages;
return chatResult;
}
My chat(...) method orchestrates the entire flow: creating threads, adding messages, running the AI, and optionally processing any returned data at the end.
async handleRun(ctx: RunContext): Promise<ChatResult> {
const { threadId, run, messages } = ctx;
this.logger.info('Handling run', { threadId, runId: run.id, status: run.status });
switch (run.status) {
case 'requires_action':
// Possibly handle shared tools
const sharedResult = await this.handleShared(ctx);
if (sharedResult) {
// If a shared tool was handled, check if the run is completed or if we keep going
const updatedRun = await this.openai.assistant.getRunStatus(threadId, run.id);
// if not completed, might loop or call handle(...) on a derived class
} else {
// If the tool wasn't "shared", let the derived class handle it
const result = await this.handle({ ...ctx });
if (result.chat) return result.chat;
if (result.messages) {
return {
threadId,
runId: run.id,
assistantId: this.id,
messages: result.messages,
};
}
}
break;
case 'completed':
this.logger.info('Run is complete', { threadId, runId: run.id });
break;
// handle other statuses like 'failed' or 'cancelled' as needed
}
// By default, just return the messages as is
return {
threadId,
runId: run.id,
assistantId: this.id,
messages,
};
}
private shouldHandleShared(ctx: ActionContext): boolean {
// checks if the requested tool is among the "shared" set
const { run } = ctx;
const calls = run.required_action?.submit_tool_outputs?.tool_calls || [];
return calls.some((c) => ['AddUser','FetchUrlContents','GetSecondOpinion'].includes(c.function?.name));
}
async handleShared(ctx: ActionContext): Promise<ActionResult | undefined> {
// 1) Identify which function is requested
// 2) If it's a shared tool, call the appropriate 'onX' function
// 3) Submit tool output, poll for completion
// 4) Return action result or undefined
return undefined; // placeholder
}
handleRun(...) checks the run status and either continues or delegates to a specialized method. Meanwhile, handleShared(...) checks if a tool is in the “generic tools” list and, if so, processes it at the base level. If not, we let a derived assistant handle domain-specific actions.
async handleFiles(ctx: ChatContext) {
// If no files or skipRun is true, do nothing
if (!ctx.input.files || ctx.input.skipRun) return;
// Example: upload each file to your storage system
const threadId = ctx.input.threadId;
for (const file of ctx.input.files) {
// 1) Convert base64 -> Buffer
const buffer = Buffer.from(file.base64, 'base64');
// 2) Upload (S3, local, etc.)
await this.storageService.put(buffer, file.name, file.fileType);
// 3) Optionally add a message referencing the file
await this.openai.assistant.addMessage(
threadId || '',
`I have uploaded file: ${file.name}`,
{},
'user'
);
}
}
Handling file uploads is domain-specific, but the pattern is similar: read the file data, store it somewhere, then optionally attach a note to the conversation so the AI is aware new data is available.
protected async onFetchUrlContents(toolCall: RequiredActionFunctionToolCall): Promise<IToolResult> {
// 1) Parse the function arguments
const args = JSON.parse(toolCall.function?.arguments ?? '{}');
if (!args.url) {
return { toolOutput: { tool_call_id: toolCall.id, output: 'No url provided' } };
}
// 2) Use Axios or another HTTP library
const response = await this.http.get(args.url);
// 3) Return the result so the assistant can continue
return {
toolOutput: {
tool_call_id: toolCall.id,
output: response.data
}
};
}
protected async onAddUser(toolCall: RequiredActionFunctionToolCall): Promise<IToolResult> {
// 1) Parse arguments
const args = JSON.parse(toolCall.function?.arguments ?? '{}');
if (!args.email) {
return { toolOutput: { tool_call_id: toolCall.id, output: 'Missing email' } };
}
// 2) Do something domain-specific, e.g. create a user in your DB
const createdId = await this.someRepository.createUser(args.email);
return {
toolOutput: {
tool_call_id: toolCall.id,
output: JSON.stringify({ success: true, userId: createdId })
}
};
}
These onX()
methods show how to interpret tool arguments, perform an external action, and return structured output. If you have more domain-specific logic—like processing PDFs or searching a knowledge base—just create a new method and tie it into handleShared
or your derived class’s handle
method.
async preProcessMessages(ctx: PreProcessingContext): Promise<MessageDto[]> {
// Optionally modify or filter messages before the run
return ctx.messages;
}
async postProcessMessages(ctx: PostProcessingContext): Promise<MessageDto[]> {
// Possibly add metadata to newly generated messages
// For instance, stamping 'assistantId' or adjusting formatting
return ctx.messages;
}
preProcessMessages runs just before the assistant is invoked (useful for sanitizing or injecting instructions), while postProcessMessages runs right after the assistant responds (useful for tagging messages with metadata).
Don't quit now!
Go to Part 5