Building a Fully Functional AI Assistant with OpenAI (Series)

Building a Fully Functional AI Assistant with OpenAI (Series)

OpenAI
AI
Assistant
Agent
RAG
Tools
TypeScript
2025-01-26

The BaseAssistant consolidates repeated patterns (thread management, handling tool calls, file uploads, etc.) so each specialized assistant can focus on domain-specific logic. For the full file, see our GitHub source. This is a generic version—just adapt or remove the placeholders to suit your domain.

export abstract class BaseAssistant extends Assistant { protected http: AxiosInstance; protected openai: OpenAiService; protected storageService: IStorageService; // example file storage interface protected queueService: QueueService; // for background tasks protected logger: Logger; protected cache: Redis; protected someRepository: GenericRepository; // domain-agnostic example constructor(protected container: ServiceProvider) { super(); container.initialize(this); // sets all dependencies on 'this' this.logger = this.logger.child({ class: BaseAssistant.name }); this.http = axios.create({ headers: { timeout: 10000 } }); } // ... }

I inject a ServiceProvider that sets up the dependencies (OpenAiService, queue, cache, etc.). Then I configure Axios for any external HTTP calls. Everything else is domain-agnostic.

chat(...) Implementation
async chat(ctx: ChatContext): Promise<ChatResult> { // 1) Possibly handle file uploads await this.handleFiles(ctx); // 2) Create or reuse a thread const threadId = ctx.input.threadId || (await this.openai.assistant.createThread()).id; // 3) Add the user's message to the thread if (ctx.input.content) { await this.openai.assistant.addMessage(threadId, ctx.input.content, {}, 'user'); } // 4) If skipRun is true, just return messages as is if (ctx.input.skipRun) { const messages = await this.openai.assistant.getMessages(threadId); return { threadId, messages, assistantId: this.id, runId: 'skipped' }; } // 5) Check for an active run; if found, handle it let run = await this.openai.assistant.getActiveRun(threadId); if (run) { return this.handleRun({ ...ctx, threadId, run, messages: [] }); } // 6) Otherwise, start a new run run = await this.openai.assistant.runAndPoll(threadId, this.id, ctx.toolChoice, ctx.additionalInstructions); // 7) After the run, handle it (it may require tool calls) const messagesAfterRun = await this.openai.assistant.getMessages(threadId); const chatResult = await this.handleRun({ ...ctx, threadId, run, messages: messagesAfterRun, }); // 8) Optionally post-process messages const finalMessages = await this.postProcessMessages({ user: ctx.user, threadId, messages: chatResult.messages, }); // 9) Return final ChatResult chatResult.messages = finalMessages; return chatResult; }

My chat(...) method orchestrates the entire flow: creating threads, adding messages, running the AI, and optionally processing any returned data at the end.

handleRun(...) & handleShared(...)
async handleRun(ctx: RunContext): Promise<ChatResult> { const { threadId, run, messages } = ctx; this.logger.info('Handling run', { threadId, runId: run.id, status: run.status }); switch (run.status) { case 'requires_action': // Possibly handle shared tools const sharedResult = await this.handleShared(ctx); if (sharedResult) { // If a shared tool was handled, check if the run is completed or if we keep going const updatedRun = await this.openai.assistant.getRunStatus(threadId, run.id); // if not completed, might loop or call handle(...) on a derived class } else { // If the tool wasn't "shared", let the derived class handle it const result = await this.handle({ ...ctx }); if (result.chat) return result.chat; if (result.messages) { return { threadId, runId: run.id, assistantId: this.id, messages: result.messages, }; } } break; case 'completed': this.logger.info('Run is complete', { threadId, runId: run.id }); break; // handle other statuses like 'failed' or 'cancelled' as needed } // By default, just return the messages as is return { threadId, runId: run.id, assistantId: this.id, messages, }; } private shouldHandleShared(ctx: ActionContext): boolean { // checks if the requested tool is among the "shared" set const { run } = ctx; const calls = run.required_action?.submit_tool_outputs?.tool_calls || []; return calls.some((c) => ['AddUser','FetchUrlContents','GetSecondOpinion'].includes(c.function?.name)); } async handleShared(ctx: ActionContext): Promise<ActionResult | undefined> { // 1) Identify which function is requested // 2) If it's a shared tool, call the appropriate 'onX' function // 3) Submit tool output, poll for completion // 4) Return action result or undefined return undefined; // placeholder }

handleRun(...) checks the run status and either continues or delegates to a specialized method. Meanwhile, handleShared(...) checks if a tool is in the “generic tools” list and, if so, processes it at the base level. If not, we let a derived assistant handle domain-specific actions.

handleFiles(...) Implementation
async handleFiles(ctx: ChatContext) { // If no files or skipRun is true, do nothing if (!ctx.input.files || ctx.input.skipRun) return; // Example: upload each file to your storage system const threadId = ctx.input.threadId; for (const file of ctx.input.files) { // 1) Convert base64 -> Buffer const buffer = Buffer.from(file.base64, 'base64'); // 2) Upload (S3, local, etc.) await this.storageService.put(buffer, file.name, file.fileType); // 3) Optionally add a message referencing the file await this.openai.assistant.addMessage( threadId || '', `I have uploaded file: ${file.name}`, {}, 'user' ); } }

Handling file uploads is domain-specific, but the pattern is similar: read the file data, store it somewhere, then optionally attach a note to the conversation so the AI is aware new data is available.

Examples of onX(...) methods
protected async onFetchUrlContents(toolCall: RequiredActionFunctionToolCall): Promise<IToolResult> { // 1) Parse the function arguments const args = JSON.parse(toolCall.function?.arguments ?? '{}'); if (!args.url) { return { toolOutput: { tool_call_id: toolCall.id, output: 'No url provided' } }; } // 2) Use Axios or another HTTP library const response = await this.http.get(args.url); // 3) Return the result so the assistant can continue return { toolOutput: { tool_call_id: toolCall.id, output: response.data } }; } protected async onAddUser(toolCall: RequiredActionFunctionToolCall): Promise<IToolResult> { // 1) Parse arguments const args = JSON.parse(toolCall.function?.arguments ?? '{}'); if (!args.email) { return { toolOutput: { tool_call_id: toolCall.id, output: 'Missing email' } }; } // 2) Do something domain-specific, e.g. create a user in your DB const createdId = await this.someRepository.createUser(args.email); return { toolOutput: { tool_call_id: toolCall.id, output: JSON.stringify({ success: true, userId: createdId }) } }; }

These onX() methods show how to interpret tool arguments, perform an external action, and return structured output. If you have more domain-specific logic—like processing PDFs or searching a knowledge base—just create a new method and tie it into handleShared or your derived class’s handle method.

preProcessMessages(...) & postProcessMessages(...)
async preProcessMessages(ctx: PreProcessingContext): Promise<MessageDto[]> { // Optionally modify or filter messages before the run return ctx.messages; } async postProcessMessages(ctx: PostProcessingContext): Promise<MessageDto[]> { // Possibly add metadata to newly generated messages // For instance, stamping 'assistantId' or adjusting formatting return ctx.messages; }

preProcessMessages runs just before the assistant is invoked (useful for sanitizing or injecting instructions), while postProcessMessages runs right after the assistant responds (useful for tagging messages with metadata).

Go to Part 5