# Building Self-Healing Data Pipeline - End to End Data Engineering Project

## Метаданные

- **Канал:** CodeWithYu
- **YouTube:** https://www.youtube.com/watch?v=As1QSF3LnvA
- **Дата:** 08.12.2025
- **Длительность:** 1:34:12
- **Просмотры:** 4,573

## Описание

In this video, I'll show you how to build a production-ready, AI-powered data pipeline that automatically detects and heals data quality issues in real-time. No more failed pipelines because of bad data!

We'll combine the power of Apache Airflow 3.0 with Ollama (running LLaMA 3.2 locally) to create an intelligent pipeline that:
✅ Automatically diagnoses data quality issues (missing values, wrong types, malformed text)
✅ Self-heals problematic records without manual intervention
✅ Performs sentiment analysis on millions of Yelp reviews using local LLM
✅ Generates comprehensive health reports and metrics
✅ Gracefully degrades when things go wrong

This is the future of data engineering - pipelines that think for themselves and fix problems before they become failures.

What You'll Learn:
✅ How to build agentic workflows in Apache Airflow
✅ Integrating local LLMs (Ollama) into your data pipelines
✅ Implementing self-healing patterns for data quality
✅ Batch processing strategies for large datasets
✅ Building health monitoring and observability into pipelines

Like this video? Support us: https://www.youtube.com/@CodeWithYu/join

Timestamps:
0:00 Introduction
1:43 System Architecture and background
5:49 Setting up the project
13:27 The Agentic Self Healing Pipeline
17:00 Embedding AI Agents in Airflow
40:44 Diagnosing and Healing Pipelines
1:11:44 Generating Health Reports
1:16:12 Results and Review
1:30:00 Outro

Resources:
Read more: https://open.substack.com/pub/datainproduction/p/why-agentic-workflows-change-everything
Full Code+Video: https://buymeacoffee.com/yusuf.ganiyu/source-code-self-healing-agentic-data-pipeline
Full Source Code: https://github.com/airscholar/SelfHealingPipeline
Ollama Download: https://ollama.com/download
Apache Airflow: https://airflow.apache.org/

Connect With Me:
LinkedIn: https://linkedin.com/in/yusuf-ganiyu
GitHub: https://github.com/airscholar
Twitter/X: https://x.com/yusufOGaniyu

#dataengineering #airflow #python #llm #ollama #datapipeline #machinelearning #ai #selfhealing #apacheairflow #dataengineer #etl #dataquality

## Содержание

### [0:00](https://www.youtube.com/watch?v=As1QSF3LnvA) Introduction

Building a pipeline is great and most likely if you're watching this video you can do that as well. The challenge is how do you develop a self-healing pipeline? a pipeline that recovers from errors, take care of the impute correctly, maybe leveraging some aentic AI or some machine learning models that's already been you know trained to do this kind of work and you can also prompt and you know repurpose some of these models that we currently have in the market like G G G G GPT5 um Claw Antropic U Lama in this case regardless of whatever model depending on how big the context is you and repurpose them to serve your purpose. We're going to be taking a closer look at some specific use case in this project uh where we designed a pipeline end to end that focuses on sentiment analysis and how we can recover from you know failures, degradations and all that get some statistics in place and how we can recover and you know create an end to end self-healing pipeline um for our end to end uh sentiment analysis project. If you are new in here and this is probably your first time of watching this uh channel, don't forget to like, comment, share, and subscribe. Uh if you are a returning subscriber, thank you very much for watching my videos. Uh let's get started and go into the architecture section of this video and see what that look like. — Let's start with the architecture of the

### [1:43](https://www.youtube.com/watch?v=As1QSF3LnvA&t=103s) System Architecture and background

system we are building today. We'll start by setting up our orchestrator. In our case, we'll be using Apache Airflow for the orchestration end to end. We can have multiple sources in this case. So, it could be a database where the all the reviews are stored. Oh, by the way, this is going to be based on the sentiment analysis uh use case and you can decide to adapt this for any other use cases, but this is going to be like a demonstration purpose for sentiment analysis. So once we have the review database or it could be a file or it could even be a real time stream regardless. Um with the new features in Apache Airflow 3. 0, You can listen to real time events from the consumers from the producers and the consumers will just automatically pick that up. Let me know in the comments section if you want me to explore that with you. So we could have a review database a real time stream um or it could be a JSON file or even a CSV or whatever it is that could be in our system. So we have an ingestion or staging database uh or pipeline in this case. So which is going to be a task that is going to be reading this data continuously. It could be from the real time stream. It could be the review database is quering this or it could just be you know selecting some data from some JSON file. Once that is done we pass it to the diagnosis or e service. This is very critical because this is where we do the structuring of our data in the right shape the right format. Do data quality cleansings and stuff like that. We pass this eventually to our sentiment analysis engine and in this engine is where we do the final validations of the data in case something is wrong we try to put in some uh self-healing capabilities in this case which is the essential uh goal of this um project. So we make sure that regardless of how the data looks like whether it's healed or not we always continuously have some sentiment analysis engine uh which is going to be um continuously running without having to suffer any uh downtime. Afterwards, we do some results aggregations and we push this data to some reasonable locations or to data warehouse or data lake. And this can just be listened to by BI uh dashboards or health reports or whatever it is that we want to use. But this is going to be uh something that is going to be left to you to work with. But at the end of the day, we could either write this to some JSON file, some CSV or even to this um data warehouse or data link. So this is going to be left totally with you. So I'm just going to work on a simple way where we can write the data out and you can just pick up um whatever it is you want to do afterwards. I hope this is making some sense. Uh so I didn't talk about the model registry in this case. So because we could have um some model trained uh in the system doing this but for sentiment analysis this is probably some rare occasions where you just leverage the existing um services uh or the models that are already providing some uh sentiment analysis and capabilities as well. So in our case this model registry wouldn't be something that will be relevant in our use case but it is something that you can explore as well if you decide to train your model from scratch or even build a rag system for this essentially u this is what the highle architecture of the system look like end to end. So let's get into the actual implementation and get started with that. So I'm going to switch over to uh PyCharm and I'll call this self healing pipeline. I'll be using Python 3. 12 for this. So in case you want to do the same um 3. 14 is having some issues with um Apache Airflume. So you might not want to consider using that. Um I think 3. 10 and 12 downwards kind of works fine. uh don't use below 3. 9 I believe for um

### [5:49](https://www.youtube.com/watch?v=As1QSF3LnvA&t=349s) Setting up the project

airflow 3 points so it might suffer some incompatibility issues but so it's between 3. 9 10 11 and 12 you should be good to go um yeah that should be fine so uh create a g repository and um create the project okay cool so with this we can you know get our project underway so let's focus um majorly on the coding part and I'll just extend this so it covers the entire screen. Cool. Now uh we have our pipeline uh all ready to go. So let's just get let's start with the requirement. txt cuz this is what will be used um to set up the project. So I'm just going to start with the requirement. txt. So I'm going to say requirement. txt. So we can get in some uh dependencies into the picture here. So we're going to be using Apache Airflow. Uh we don't have to specify some versions, but for the sake of people that might be watching this at a later time, uh let's use 3. 0. 6 as our airflow version. So we can just say greater than or equals to 3. 06. That should be fine. All right. So we have airflow provider pro provider Apache airflow providers. Yeah. So let's use the fabulous which is going to be greater than or equals to 3. 0. 0. Why am I using full stop instead of dash? Yeah, that should be fine. So um this should be the dependencies for Apache Airflow. Now for our machine learning uh and national natural language processing we'll be using transformers uh in this case or alternatively we don't necessarily need to use a transformer because we'll be using something else. In our case we'll be using O Lama. So I don't think um transformers will be useful in this case but um I'll just put it in here anyways. Maybe touch in case we want to explore that. Uh but this is what we'll be using in our case. Olama and then we have psychop g2 binary and we can get in some pi test in this case at the end of the day. So uh for pyop g2 binary we just say 2. 9. 0. Okay, good. So, you can decide to use any other versions. But for old lama, let's use um 0. 6. 0. I suppose that should be fine. So, we can just say pep install our requirements. txt. So, that should set up um our requirement. txt for us. Good. Now while this is running and in in place we can just uh create uh directories that will be used for Apache Airflow. I think I should increase the size a little bit. Okay, now let's create our directories. So, we're going to have our DAX, models, impute outputs, and logs. So, this is going to be our basic um directories for our project. So in our case we have all of this. So let's get in our dag. The first dag we'll be creating is going to be sentiment. In fact let's say aentic aentic pipeline dag py. Okay. Good. Yeah. So this is what we need uh for our impuse. We can I'll get you the link uh in the description below. Uh, but I'll just copy our review data sets in here so we can use that. I'll just paste that in. Okay, good. Now, this is what our data set look like for the review and I'll go through that shortly with you. So, this is our impute. Um, we've got our models in case we want to create a custom model in this case, but that might not be necessary for our use case. So, but let's continue. Now that we've gotten Apache Airflow installed in our local system, we can just say pip um Apache Airflow um we can just initialize the configurations default configuration for airflow 3. 0. So we just do airflow 3 air fo migrate and this is going to set up in here but wrongly it is doing it in the wrong location. So I need to export our airflow home to the current directory. So it's going to be airflow homem and we can just say airflow db migrate. So this will be in our current directory. So you can see that is uh in our self alien pipeline uh directory. So we have the configuration in here. So I'll just load examples change it to false and we should be good to go. So now uh with this in mind our airflow should be up and running. So I'm just going to use the same command uh airflow standalone so to can to run this uh in our local system. So we have the scheduleuler, the trigger, the API server and I think the processor as well. So yeah the processor. So everything should be working uh seamlessly. Now let's get in um what our UI looks like. So once started I just need to copy the username and password in here and put that on airflow UI. So if you go to the browser and you type in admin and the password uh which is the one inside of this um simple of manager passwords JSON generated you should be good to go. So in our case um this is this password. Now with that being said, you will see that we don't have any dags simply because we changed the configuration to load example load examples to false. So that's why you don't see those um default dags. All right. Now with this being uh at the back of our mind, we can now get started with our DAG. Now let's get in the first D. So we have our impute which is the

### [13:27](https://www.youtube.com/watch?v=As1QSF3LnvA&t=807s) The Agentic Self Healing Pipeline

academic data set to review. Uh some logs have been created. Uh but who cares about that? Now let's start with the first one. Uh let's get our SDK in. So from airflow SDK import dag task param I think get current context. Okay. Now let's get our logger in import logger. Import lo login in it. So we have logger equals to login dot get logger as the name. Now, so let's get our initial config because of our agentic pipeline. So we have our config. The config is going to be so let's start with the base directory. So the base directory is going to be os. get env. All right. And this is going to be uh pipeline base directory or we put it as um not a genic pipeline. This is going to be our current directory. So, I'll just copy this. Um, where is our path or reference? Copy the absolute path. And I'll put this in here. All right. So, this is going to be our baseline uh pipeline directory. Good. Now, uh let's remove this. This is not necessary. Good. [snorts] Um All right. So, that's our base directory. Now our impute directory impute file which is going to be the file that we're going to be using. So we have this in our join path for base directory or we can decide to put this in here. So let's say os get enget get env and put this as pipeline impute file. All right and this is going to be let's get in our base directory. So let's say I'll put it here directory data. Um this is not data is impute impute. Then we can have a yelp. Um I'll just copy the name. All right. Uh JSON. Good. Now that's our impure directory. Now the output directory is going to be the pipeline output directory or the output directory which should be fine. Now a couple of other you know relevant parameters that should be set and um environment variables. We have our maximum uh text length that we want to use. Um this is going to be um let's say 2,00. Uh you can change this to whatever you want but in our case we just use 2,00. Now our default um batch size is going to be uh this is going to be like let's say 100. So at a time we process a batch of 100 then the next time you can continue. Um so default offset is going to be zero. So you always start as level zero then uh 0 to 100 we get process first then you get the point as we go along. Now for all lama uh settings. So in here we have our Olama host. So it's going to be this. But if you haven't installed O Lama on your laptop, uh in my case I have. So you can just go to uh google. com and search for

### [17:00](https://www.youtube.com/watch?v=As1QSF3LnvA&t=1020s) Embedding AI Agents in Airflow

uh download lama for Mac. So in my case, I'm using a Mac OS. So you can just download O Lama. download it in here and you should see something similar somewhat similar to something like this. So once it's installed uh you can say hello uh literally is your chat GPT but this is the free version. You have it on your local system running. You can always um change the uh models that you currently have in here. So you should be good to go. All right. [snorts] Um we can go deeper into uh more details uh in subsequent videos uh if you want. Now the lama model that you want to serve is going to be llama 3. 2 uh just remove this llama 3. 2. Good. Now the amma timeout is going to be let's put it as 120 and uh retries is going to be let's say three that should be fine um and that's all we need to do I mean in terms of our config uh class so this config class can now be reused as many times as um as possible now let's start with our default argument ment. So this is proper Apache Airflow U default arguments now. So we have our owner is going to be I'm just going to change this to you can ganu um depends on pass is going to be false. Um let's put in retries in here as one maybe two. Um retry delay should be time delta of 1 minute. So we don't have to wait too long. Import that from date time. Good. And the execution timeout. Execution timeout is going to be 30 minutes. I mean if you are processing larger batches you might want to increase this. But in my case 30 minutes is way over time for just 100 batch which is good. All right. Now with that uh set up now let's get in our functions that will be reused multiple times for our project. So we start with load um lama model and this is uh this is something that we just have to do. You don't have to necessarily uh set this up um because the lama already have the models in place. So um this is just for you to connect to Olama that is running on your HTTP server. So you should be fine. So I'm just going to say import lama because I already installed that. Don't forget in our requirement. txt is already installed. Good. Now um let's say logger. info model name and then logger. info. Again, our host is going to be this lama host. Good. Now, the client that will be using this lama is going to be the client host. So, I think it's probably best we just set this as our host directly. So, we don't um make assumptions in here as to which parameter comes first. So, client show is going to be model name and our logger. info info just to um to tell us what the progression looks like. So we say model is available. Good. Now um except if we have a lama response uh response error as our e is going to be error loading model and this is something that we probably uh need to handle. So if let's say you try to use a model and the model doesn't exist we you should try and pull that model into the system instead of just reporting and raising this error. So let's say um logger. info model not found ugly attempting to pull that and we can say client. pull and model pool successfully. So uh in our case because we are trying to say if something happens we want to as much as possible uh clean this up so we don't run into issues. So now if you try to pull the uh model and you put the wrong uh wrong name for the model then it can now raise uh an error message otherwise you should be good to go. So we can just put a test response in here to see if our model is going to be fine. So model name model equals to model name and the messages is going to be um the row user and we say uh content [snorts] uh let's say classify this the sentiment this is a great product as positive, negative or neutral. um whatever the response is uh we should be able to assess if our model is able to correctly classify uh this before we continue. So uh we can say test result I can I need to close this. All right. So I can say test result is going to be test response for message. So we have message and the content. So we strip that and then we convert everything to uppercase. So we should see o llama test result equals to that. So um I think it should be model validation response passed. Okay. All right. That should be fine. So let's return um in our case back end is going to be lama the model name is model name um host it's going to beama host the max length is going to be that and the status is going to be loaded already whichever one um validated at in our case uh it's going to be date time dot now ISO format good and that should solve our problem uh intrinsically so you should have uh something somewhat similar to this once the XCOM runs this particular task okay good now um if this is going to be uh a model uh sorry read from let's say database or something then you probably need to have a separate uh function to do this. Uh but in our case let's load this from file uh let's load from file and this is going to be params is going to be a dictionary um the batch size batch size it's going to be integer the offset is as well. So our impute file. Okay, there's an error in here. So this is going to be like that. Good. Um, so we have our impute file is going to be params. I'll change this to params. Params dot um get. So we can have our impute file or the the directory where the impute file is set to be. So if not OS passed not exist then we say impute file not found. Um and in this case we can just say reviews equals to empty. So we can properly uh handle this um file not found error. Good. But if the fan if the file is found then we say open the file and the encoding is going to be UTF8. So you can slice it. So, less sliced. Um, it's going to be I tattoos. I slice. I'm just going to bring that in. I tattoos. Good. Um, I slice. So, we have our file, which is going to be the file we want to slice in. The offset is going to be where to start from and the batch we want to uh extract or slice from there. So, it's going to be our offset plus batch. So, let's say this is zero. So you start from zero then you say 0 to 100. So it says 0 uh to 100. Make sense? So the next one when you're in the offset one you start at with one and it's going to be uh going forward from there. Okay. So the offset is going to be controlled by uh both offset and batch. Good. All right. So for line in our sliced so we try to put this inside our memory. So we have review is going to be JSON. loads. So we load that. Uh let's import this. Import JSON. Um line. Let's strip it. Good. So we can have something somewhat clean. So we can have reviews. append. Uh but there's a problem with this. So if we put this directly then we assuming all the reviews have the same format. Uh they they're okay but we don't want to assume in this case. So it's best to handle this manually. So we have review ID to be review ID. So if this doesn't work we can just uh you know put something as a default u value in this case. So our business ID which is going to be what we have in the JSON uh in case you want to get some more information about that I can just copy this in this case and create a new uh in here. So we have uh something similar to this. I didn't install the JSON formatter. So you probably have to work with me on this that I have. So this is what it somewhat looks like. I mean the review object uh this is the structure that we have. So if you look at this so we have um review the business ID the user ID the stars the is useful if it is cool uh we have the text then we have the date. So this is somewhat what we're looking for. So in our business ID, we say review get business ID. The user ID will be there as well. The stars will be there. The text dates will be there. All right. And then if it is useful or not, we'll get that in funny and I think cool. And that should be everything. Now in a case where something like this doesn't exist in the source file, we can just u you know add a default value in here. So maybe for our stars we can put zero as the default value. The text is fine. Date is um going to be you know you can use the current date time useful might be something like this. So you can just uh default some of the some of these arguments in there in case they do not exist. Okay good. Now except JSON decode error. So we can just skip it um error um loading this but instead of erroring out. So let's just warn this uh warning. So we say let's say skipping invalid JSON and then continue. So the reason why we want to do this is we want to make sure that if there's an error um trying to load a particular review maybe it is invalid it's malformed or whatever it is then we can just keep it. So instead of you know breaking our pipeline. So at the end of the day uh we say returns and if you want to you know log something in here you can say logger. info uh loaded loaded lender review with this offset. Good. All right. So that's how to load the file. So let's get in our uh how to pass the response. So once this is loaded, we can pass this. So let's say pass Olama response in our case response text and we could be using uh this. So uh in my case I need to just do a try except in here. So I do a clean text. It's going to be response. ext text uh text text. Why do I type in text in there? All right. Um so we're trying to handle the markdown block. So if we have the three if it starts and end with that then we try to clean it up. Um [snorts] essentially um we don't necessarily need to have this as the add end. So it's better and safer that we start with this. So if it starts with this then it's a mark markdown uh block. So in this case we can just load a line. So let's say the lines is going to be clean text. Uh we split this by the new line and then we can get the clean text as um new lines. Then we join the line from one to minus one and that will be the last line. If the last line uh for minus one equals to this otherwise we just load from one to the end. So what this is trying to say is if you have um uh an output or from the from llama that looks like this. If it looks like this. Yeah. So we start from let's say response in here. So if the response is somewhat like this. So what we're trying to say is we pass from instead of starting from zero, we start from one, right? Which is what we have in here. So we start from one to the end um and up to minus one. So minus one is going to skip this last one. So it's going to only return these two lines. and if that line is uh looking like this or somewhat like this then we we skip first and the last line. But if we start with this and it doesn't have an end, we just copy everything to the end. I hope that makes sense. So that's what I'm trying to do in here. All right. So at the end of the day, I'm going to have my past is going to be JSON. So I can just load that and then we can extract the sentiment. It's going to be past get and we can get the sentiment. Um in our case if it doesn't exist we just default to neutral. Um but if it exists it's going to come through as we expect. So the confidence in this case is going to be float uh past. So um if the confidence doesn't exist uh we say it's zero and you can use any number in this case but we just want to leave it in here. Some people we say you use something like 80 uh so we can have 80 uh neutral if it doesn't exist but I mean it doesn't matter what you use but if it is neutral and is zero then it's probably a signal that the file wasn't passed correctly or the result wasn't returned correctly. Um so if sentiment is not in positive, negative or neutral then sentiment is going to be neutral. So if it is unknown we put it as neutral. So we can just return the label label is going to be sentiment and then the score in this case will be the confidence. So let's say um to better main max confidence and zero and one. So, we just want to make sure that our confidence is between uh 0 and one, not less or higher than one. And this is probably something we need to take a look at because we don't want a situation where you have 1,000 as your confidence or minus 10 as your confidence. All right? So in a case where you try to have JSON decode error uh one and it could be if you have maybe value error or you could have a key error or maybe the type error when trying to convert at any point in time. So we can just say upper text. So let's um handle this bit clearly. It's going to be uh response strip and upper. Now if positive is in the upper text then we return the label of uh label positive and let's default it to 75. Uh if it is negative then we say 75 otherwise we just return uh the label of 0. 5 as neutral. I hope this makes sense. So if we try to pass this and everything fails um we return 50% neutral. If it is positive we say positive 75% or negative 75%. And if you have a special logic for this you can always um replace that in our case. So um the next one will be to analyze um with lama. So in our case we have our yield reviews um which I'm trying to just get as many functions in as possible. Um but some people might argue that we can just uh you know get started directly. Um I mean that would make more sense as well. Maybe we can do that and just uh you know start with our pipeline. So I'm just going to create a new uh dark directory in here. a DAG function in here and I'll call this uh so DAG id is going to be um a genic pipeline DAG uh maybe self-healing selfhealing pipeline DAG all right maybe remove D all right um well that got added again so we have our default argument is going to be default ax and the description for this dark would look like this maybe We can say the pipeline for sentiment analysis using O Lama um for local. Well, I think that's fine. That's a little bit more descriptive. The schedule is going to be none. We just manually trigger this or you trigger it externally, however you want it. Now, the start date is going to be 2025, December, and 7. All right. Uh catchup is going to be false. So the tag is going to be sentiment analysis lama maybe NLP will be something relevant in this case. Um yeah that should be fine. Um params for some of the parameters that you want to add. So in our case we want to be able to add some you know impute file and all that. So let's say um impute file is going to be our compute uh config impute file and this is going to be the part where we have the reviews stored the output directory in our case I don't think we need to do this so because this is going to be by default anyway so the batch size is something we might need to consider so the batch size is going to be our default batch size um I think we already did that as 100 so that should be fine Um yeah the offset that we start with is going to be zero. So which is the default offset. So that should be okay. Now the amma model that we'll be using is going to be uh the lama model 3. 2 I think. Is that all right? 3. 2. Good. All right. Um you can leave it at this. And if you have additional parameters you want to put into this uh DAG uh you can just add them but make sure it follows this kind of um features and uh structure. All right. So that should be fine. In our case we just add one more line in here and say render template as native object. So it should be true. So that's all we need to do in this case. Now let's get our uh self-healing pipeline pipeline dark. Maybe just a pipeline should be fine. All right. So we have our first task. So we have task is going to be dev load model and in our case um load model. Yeah. So we can say context is going to be get current context to params. Okay. Already loaded for me. So we have params as a context params. The models that we want to use is the model or we default it to this. Then the model info that we want to use is going to be um model info but this is not being used. So this is probably a waste of memory space. Say return model and then remove that from there. Good. And we can just in here we can say logger. info info and this will be using model for this. Good. Now that's our load model and um yeah that should be fine essentially. So the next task is in this case will be to load our review. So load um reviews. So we load that using the same um structure. We have the context which should be extracted. We get the params which is going to be here. Our batch size is going to be from here. Then the offset the same way and we can just say load from file using the params the batch size and the offset. That's properly uh set up. So we can say logger. info. Um we say loading. Yeah, we br signs and this offset. Good. Um, yeah, that should be fine. So, we are going to have the next task as um dev uh diagnose

### [40:44](https://www.youtube.com/watch?v=As1QSF3LnvA&t=2444s) Diagnosing and Healing Pipelines

and uh eel the batch. Yeah, reviews in our case is what we passing in there. So, we get our in fact this our reviews is going to be a list of dictionaries, isn't it? Because once we we extract the reviews in this case. So it's going to return a list of dictionary. Yeah. Uh if you look at the definition in here, so it returns a list of any which is a list of dictionary anyways. So once that returns, we pass that into diagnose and ill batch and we can go through in there. So we can say uh healed reviews is going to be um uh we need to have a function to do the heal reviews. So we pass in the review uh for review in reviews. What that means is whatever function that we are using this to do the healing we go through whatever that needs to do uh that we need to do we pass in the review in there and then we return it as an array. All right. So we do the eling count in here. So we have ill count maybe count will be better. So I'm going to have for sum of one for r in this. So we try to get the heel um let's say he review if r. get. So I think let's say was healed will be um will be better. All right, good. And then we say logger. info. We say uh we healed this percentage. So we say heal this out of the size of this and you can use that to get the percentage in this case. So let's get in our function for healing the reviews. So we are going to have a function just above the dag in here and call this hill review and we pass in the review as a dictionary and this is going to return a dictionary for us. So at the end of the day we do the extraction uh get review text and we default it to zero to empty if it doesn't exist. So we have a result in our case it's going to look like this. So we have um something like this. Review ID will be extracted. We have the business ID will be get the business ID. The stars is going to be review get stars. The original text is going to be the uh let's not set it directly. So let's put it as none for now. And we can say error type. And there's a reason why I don't want to do this directly because I want to have the initial structure and just replace it um afterwards. So um error type. So if it was ill, let's put it as false for now. Um if there's an another action taken, we put this as none. All right. Um yeah, if it was if it was then we do this. Then let's add a metadata in here and we call this uh user ID is this. The date is that the useful is that the funny and the cool is that good. Now in our case um once that is done this is going to be the structure the final structure that we're going to be working with at the end of the day. So we can have um if is instance text of string um or we can just add more uh integer I'll put this in here integer float uh boolean then type of none any of them that comes through then we say our result of uh original text should be text so we just replace that with it. That's what essentially what we're trying to do. Otherwise, we just say uh what's going on e review. Yeah. So, otherwise we just say result for original text is going to be string of text. If uh text is not none, I'll just put it like if text else none. Simple as that, I think. All right. So if the text is none at the end of the day if we have the text as none then our error type will be missing text. All right or no text whichever it is we want to do. Uh the action taken was to fill in with provider or a placeholder. Um the action taken in this case is fine. Uh the yield text is going to be uh no review text is provided and was healed is true. Good. L if uh if it is not an instance of string then we say result of error type is going to be wrong type. Then we can do some try catch in here. Uh try except in here to do some conversion. Converted is going to be string of text strip. Then we do the result healed text we say is converted if converted or we say no review text provided uh except uh exception I'll just put this as general exception uh we say he text no review text provided or I can say conversion field all right Then we can just put in some action taken in here. Conversion to converted to string um and was healed is true. Maybe this type conversion might be better in this case. All right. Now um another instance is if the text is not uh text of strip. So if at the end of the day when we stripped this particular text is empty. So we can just put this as empty text and that will be fine. L if um we try to do a regular expression uh search in here. So the pattern we're trying to find is um A to Z that's A to Z or 0 to 9. So it's going to be A to Z small letter A to Z capital letter or 0 to 9. All right. So we have noninformative text. So this doesn't make sense. So it should be special characters only. All right. Um the hill text is going to be no re not no review. So let's put this at nontext uh content. We don't have to remove it, do we? So it's just non nonext context. That should be fine. And then replace with um character. So I just say not let's put this as replaced with or replace special characters was healed is true and uh this is true especially if you have something like this uh I don't know if you have this is something that is kind of common uh online uh when people want to express some uh deep thoughts All right. So we have l if so if the length of text is greater than maximum text length we just uh try to extract that uh maximum text length that we've done. So we say exit m or say we say too long that should be fine. The text is going to be we extract this uh let's minus three from here. So we can just say plus dot dot. All right. The action taken is going to be truncated. Um and if it was healed and I think that should be all if there's no um healing that needs to be done. So we just say uh text of strip and we can just say result uh was healed is going to be false. All right. And at the end of the day that's all. Uh we just return result. So that should serve us pretty nicely. Um yeah, that should be okay. Um yeah, let's continue. Now the healing has taken place. Um we've done the healing. Let's see where we are. So we have diagnose and heal batch. So for the reviews, we are good to go. And we have delivered the healing of the reviews. Now let's do uh we've cleaned the data everything should be fine now so we should be able to analyze our sentiment so we can say batch analyze uh sentiment and this is going to be a review um okay that get populated uh review then the model information is dictionary um so let's review this so uh we first need to handle this if not hailed reviews. Uh we no reviews in this case. All right. Um logger. info. Maybe we need to remove this. Um we're trying to analyze this for now. And we say return analyze with lama. And um we pass in the ill review. And we're good to go. — [snorts] — Um, yeah. So, that should be fine. So, let's put this inside a function. I'll probably delete this for now and you know move this up say devama and we have the yield uh reviews uh which is going to be a list of dictionary and the model information. So we're going to be returning a list of dictionary as well. So that should be fine. So I have import lama and then import time as well. That should be okay. Now uh our model name is going to be model info get the model name. Uh host is going to be model info geta host. So we can get client established now. So we have client it's going to be client that should be fine except this um logger error fail to connect to host. I think this is kind of like the basic stuff. So we can say um create degraded results um for this review and this is going to be string of e and the reason is because uh in a case where we have some degradation that has occurred maybe the amma service is not available we need to be able to handle this and we'll come back to this much later but for now let's continue all right uh result is going to be empty array. So we have our total that we want to be working with as length of a review. Now um we can each of these uh reviews we can just loop through them and get our uh reviews in place. All right. So we have enumerate heal reviews. So our text is going to be revealed if text and the prediction is going to be null. All right. So uh for attempt in retry. So if you try the first or second or over how many times that we set. So we have this in here. So we have classify the sentiment. Um okay I have to probably change this. So let's review this correctly. So we should have something like this. All right, I'll change this. Delete that. So I'll say analyze the sentiment of this review and classify it as positive positive negative or neutral. new draw. All right. Now, I need to put in the review. So, um I'll just in fact I'll change this to three lines. So, we can have something like this. All right. multi-line. All right. I'll delete this. And I'll have my review is going to be whatever it is that I put in here. So, I'll put this as text, right? Maybe this should be better so it's easy for me to format this correctly. Um, yeah. So, I have my prompt as this and I'll put this as F. So, I can just put the um data in there correctly. All right. So I can just say respond or reply uh let's say reply with only uh JSON object and the format is going to be something similar to this sentiment um let's see sentiment is going to be positive all right and the confidence is going to be 0 95 something somewhat similar to that. All right, I think that's all we need to do. Uh yeah, let's test this out uh at the end of the day. So we have um our prompt in there. So we say client or chat, the model name that we're using, the role is user and the content is the prompt, which is good. The temperature we should set. So let's say our temperature is going to be as close to one as possible. So uh zero as possible. So I'm going to have this as temperature options. It's going to be temp temperature at 0. 1. All right. Not too far from zero. So we don't introduce uh unnecessary in this case. So um I don't need the timer do I? But you can set that if you want. My response should be fine. So let's extract the response. So we have the message content and let's strip this to avoid the spaces and all that. So the prediction in this case is going to be pass response and that should be fine. So break it up and exception. So if um if something is wrong and we are not able to pass this correctly. So let's say if attempt all right uh because you already have attempt in there is less than config uh retries minus one. So we say attempting this. So sleep for 1 second and uh else uh yeah just return neutral in this case neutral instead of zero let's wait at 50%. All right and the error message can be put in there. Okay now that should solve our problem in the first instance. Um yeah so our try block should be okay. So, we just need to handle this um outer block in this case. So, that would be Yeah, I'm in the right place. So, I have my idx + one. Uh yeah, that should be fine. Okay. So, at the end of every 10 block, so uh more like my checkpoint in this case. So I'm going to have 10 um messages in this case for processing 100. So you have 10 over 100, 20 over 100, 30 over 100 and stuff like that. I hope that makes sense. Good. Now our results can be appended. We can just append this correctly. So we have um review ID will be this. The business ID will be that. The stars will be this. Um the text is going to be this. Then the original text um original text will be that um predicted sentiment it's going to be that which is the label in this case. Um the confidence score um not confidence score I'll probably use confidence in my case. Um I'll just say I'll run this up round this up to maximum of four decimal places in it. Good. Now if there's um yeah if there's status I get that in I'll say healed if reviews were ill otherwise the I'll just put this as success right and uh let's get in our healing applied review dot get was healed and healing action. Healing action will be the action taken if review was healed otherwise none. Uh instead of this should be none. Good. The error type is going to be this. Um otherwise is none. Then the metadata will be that. Good. And that's all we need to do uh to get our results in. So at the end of the day, we just say logger. info inference complete length of um results processed. Good. Um I'll just put this out in here. I just return results. Good. Um, so I have my review. Let's just confirm this correctly. So we have my loop in here. I append that and at the end of the day I return the results. Good. Now uh let's finally handle the degradation in this case. So I'm just going to create another function in here. I'll call this dev create a degraded result. Um so we have healed reviews for the my list of dictionary. So I say uh just put this like this. I'll say return uh I'll just copy everything from review. If I copy everything from there I just append this with results uh with some additional uh keys. In this case, text is going to be ill text. So, we have predicted sentiment is going to be neutral. The confidence is going to be 0. 5 and the status is going to be degraded. Uh the error message is going to be the error message and that should be fine. So, in my case, I have healed reviews. So, for review in heal review and that should be fine. All right, that's all we need to do in our case. Now, um if you take a look at our task again, uh sorry, our DAG again. So, at the end of the day, you have a batch analyze sentiment, which we already did. Now, now we can just aggregate the results. So, we can task say dev aggregate results and my result is going to be my list of dictionary. So let's handle this. So we have context is going to be get current context. The params is that the results is going to be list of results. Okay, good. Now the total in my case is going to be this. Um so our success count is going to be sum of one for our in results. If r get status equals to success, our yield count is going to be healed. Our degraded count is going to be degraded and that should be fine. So we can just say sentiment uh distribution is going to be positive negative neutral. That's fine. then loop through to say um uh I'll just change this. So I'll say sentiment that's not properly written. So sentiment is going to be our get uh predicted sentiment and default it to neutral uh that should be fine plus one. All right. Um the eling statistics in this case uh we have elen stats for our in results uh we get the aling applied uh the error type is going to be unknown and the alen type action is going to be something similar to that. All right. Uh the same thing if you have uh if you want you can you know get some additional uh stuff in you know checking the confidence and the stars and all that. So but let's just let's just do that star sentiment is going to be for R in result. So I say um get the stars. Yeah the sentiment is going to be maybe neutral in this case. So if stars is in star sentiment um then we should be good. Um maybe not star sentiment. So I'll just say get this. So if stars is not in there. All right. My R. Yeah. So my R is in result. So I say uh sentiment. Uh let me see. Am I doing something wrong in here? So my star sentiment let me take a look at that again. So I will get results. So I'm be passing that. So I have my list of results. Okay, I get that. Yep, that should be fine. And yeah, so yeah, taking a look at this again. So I have my stars, I have my predicted sentiment. So I'll just get a key in here to make things uh easy. So I'm going to have the int just put this inside of this curly brace. So I have my int of stars and I put this underscore star. So maybe five star, three star and other. So if the key not in star sentiment I'm going to have my star sentiment as either positive, negative, neutral. Okay. And at the end of the day, I put this in here and that should give me my star sentiment. So, uh, my output will look somewhat like this. Uh, expected output would So, if you have one star, you see something like this. Two star, three that. So, that's what I'm trying to do in here. All right. Now, um, finally, we have our confidence. to um confidence by the status. This is just uh to get some statistics in my success is going to be um status is going to be get that and I'll default it to success. Um maybe not you can just you know get this directly so you don't have any uh bias in case something is wrong or whatever. So I get the confidence at zero then if status uh is in confidence by status then we can say confidence by status append and the average confidence is going to be as simple as this uh for status in that and we're good to go. So that will be my average confidence. So we can have the average um confidence across all uh inference that we've just made for this batch in particular. So our summary in is going to look like this at the end of the day before we write this to um our final location. So we have our run info. We're going to have something like this. Uh time stamp is going to be now uh the batch size is going to be this. Uh offset is going to be that. The impute file impute file is going to be this. All right. Um let's get in our totals. Yeah. Our totals in this case will be totals would be uh processed be total success um just success count the healed count the graded count and that's fine. We have our rates which is like a percentage isn't it? So we have our success rate is that um I'll change this. So I'll say success count uh let's just put in some random figure in here. Round success count. Um I'll have max total one. Yeah, that should be fine. Uh we're rounding that up to four decimal places as well. uh degradation rate is going to be similar to that as well. Good. And let's get in one or two final key. Sentiment distribution is that uh the alien statistics is fine. The star confidence sentiment uh correlation uh correlation. All right, that is the start sentiment. The average confidence by status uh average confidence is fine and the results is going to look like this. Good. All right. Um so finally let's write this to some location. So I say os um dot make directories if it doesn't exist anyway. So we say config uh output directory exist. Okay. If it doesn't, it's fine. Time stamp, it's going to be data date time now ISO format. Um, let's just format this, not ISO format. STRF time, hour, minute, seconds. Um, the offsets is in this case is going to be offset, right? Then the output file is going to be sentiment analysis summary. Um, so I have sentiment analysis summary. Uh, maybe I start with time stamp first. All right. Then I have my offset at the end. Okay. So that should give me a clear sense of what offset we are working with in this case. All right. So we just have we open this as dump it as it is. Uh maybe the indent should be two not four to avoid bloated file. All right. So I'm going to have my default default is going to be string and the a is false as key not as [gasps] logger. info info we just say um summary written to that uh log. info info I'll say process this total and the delation count is that and I can just say return um let's get in some key value pair KV all right KV for KV and summary items uh if K is not equals to result that's fine all right last and final task in this case which is going to be um more like the health report. Um this is entirely optional. Uh so you don't have to do this but if you want it's something that is easy to just um generate a report. All right. So we just pass in the summary as a dictionary. Um okay. So let's see what that look like.

### [1:11:44](https://www.youtube.com/watch?v=As1QSF3LnvA&t=4304s) Generating Health Reports

Uh the summary getting our total uh healed count uh total success yield and fine. So we we had a couple lines in here. So we say if our degraded if degraded uh is greater than total times 0. 1%. Then health status is going to be critical critical. All right. If degrade L if our degraded is greater than zero we just say health status is going to be degraded. All right. Um L if our yield is greater than total times 0. 5% healed isn't it not health healed. Yeah. So we have our health status as uh maybe warning house we say is healthy. Good. Um, which one is this? Uh, pipeline reports. Okay, I'll change this. So, we don't need this type of report. So, I just have my report like this. Um, that's going to be a pipeline. It's going to be all right. Self-healing pipeline. The time stamp is this. The health status is that run info is going to be summary run info. Uh the metrics that we're going to be working with is this uh metrics. Yeah, let's get this as JSON. So we have total processed success count, degrade count, success rate, healing rates, degradation rate. I don't think we need these counts anymore. Just the rate should be fine, you know. So we just get those in. Uh sentiment distribution is fine. Uh the alien summary alen summary is that okay? Oh right. So reports in it. All right. So alien summary. Okay, alien summary should be fine. So we have our alien statistics and then the average confidence is that okay that's all we need to do. Uh logger. info this is fine. Uh logger. info info we just say success rate is that success rate degradation rates that's it uh that's all we need to do so just return uh report all right so that's all uh in our case so the next thing is just to get our data flow uh in the right shape and form and we should be uh good to go so in our pipeline. We have our load model. We have reviews is load reviews. Then in our stage two, we do healed review to diagnose and heal the batch of reviews that we want to work with. Our step three in this case will be the analyze uh the sentiment uh for each of the reviews. At the end of the day, we get in our summary. Then we generate out report. Um that's it. [snorts] So we can just say our um our dag instance is that uh yeah that should be okay. Uh anything in here summary we're not using success are we? So we can change that uh anything in here. So we say our R in our result. So yeah, that's this

### [1:16:12](https://www.youtube.com/watch?v=As1QSF3LnvA&t=4572s) Results and Review

should be fine. So it's it doesn't understand this is a list. So yeah, that's that should be fine. Anything else that we're missing? Yeah, let's take this for a spin and see if our Lambo is going to crash on autoban or not. So I just refresh this. I have my self alien pipeline. The fact that you can see it in here means there's no errors in our syntax axis. Otherwise, we probably wouldn't see it in here. All right. So, ah, let's take this for a spin. Trigger this. So, the good thing is our ROM parameters are in shape now. So, the location of our Yelp um data set review. And in our case, we are starting with batch size of 100. So, in here, the offset is going to be this. The llama model that we're going to be using is this. All right. And we trigger this. Let's see what that look like. Okay. So, if you look at the flow, we have our load review is trying to load the model, diagnose and heal the batch and do the batch analysis. So, let's take a look at individual uh level. So if you look at the XCOM, we have 100 reviews in here as expected 0 to 999 which is good. So going back to our load model, uh if you take a look at this, I'll just wrap this up. So if you says it says the sentiment of this statement, this is a great product is positive. It expresses enthusiasm. All right, [snorts] so something is wrong in this case because we just say only uh return positive and all that. So but it's this is going to become a problem. It have to be uh as specific as possible either positive or negative. All right. So let's see uh we change this. We need to change this uh from here. So going back to our load review is loaded. Diagnosing and healing the batch. Um I'm not sure we probably will see some heals. So uh was healed. Let's see if we can get some healed was he healed and this is going to be true. So there's one true in here which is this. So it was truncated text. So it looks like the text was too long and it was truncated and that should be fine as expected uh when it was trying to do this. So we say uh let's see the first one in here analyze of uh analysis of the sentiment. So let's go to the XCOM for more uh better presentation. So we have uh this. So the original text is this. The prediction is neutral. And let's see if this is actually a neutral sentiment. If you decide to hit here, just be aware that it's going to take about 2 hours from beginning to end. We have tried it multiple times because I want to like it. I have [clears throat] been to its other locations in New Jersey and never had a bad experience. The food is good but it takes a very long time to come out. The weight staff is very young but usually pleasant. We have just had too many experiences where we spent way too long waiting. We usually opt out for another diner or restaurant on the weekends in order to be done quicker. Well, kind of sent neutral if you ask me. It wasn't as um you know seriously uh let's see this one. It says 95% positive. This is saying I've taken a lot of spring classes over the years and nothing in compress body cycle from the nice. Looks like this is nice. Yeah. Uh do we have any negative? Negative. So it says I'm a longtime frequent customer of this establishment. I just want I just went in to order takeout for three apps and I was told they too busy to do it. really the place is maybe half are full at best. Does your no uh okay this is negative. Yeah. So it's not healing anything but it's definitely negative. So yeah and the confidence is like 100%. So it's super confident. Um so there's a problem in this case because uh the result that our model is returning is not what uh you would properly probably expect. Uh let's see why is this like that it says the type error in string. Uh all right so it's in the line 3 it is 368 in upgrade result. Let's see 368 in aggregate result 368. So it says uh we're trying to get the predicted sentiment. So uh where's our star sentiment 368 it says for stars right? So we are saying if stars not in sentiment it doesn't make sense. Uh we should say if stars and sentiment. So if we have stars and we have sentiment that's when we can proceed. Not in yeah that's the problem in there. Yeah. Let's retry this again. Uh maybe we repass the D in it. Repass the D and trigger it again. That should be fairly fast and quicker. So it's queued up. Yeah, loaded the model very fast this time. Batch loading that which is good. Analyzing the same batch again. So 10 is done. Let's see how long it takes to finish that. So, it takes about maybe 10 seconds roughly to finish 10. So, it's like a second for each review. Uh sometimes less. Yeah, between 7 to 10 seconds. All right. I mean, I can work with that. It's just uh by the time we want to trigger this in parallel over multiple batches, then this is probably where we should uh we should be a little bit more careful. All right. So, um 70 over 100, which is almost done. 80 is done. And you can see it's like consistently between 7 and 8 seconds. um between that I mean it's not too bad if we have a faster um machine then we can it can be faster and you can see now that we fix that then our results should be fine. So if you check the XCOM and uh try to expand this all uh you can see that we have the right uh results and everything seems to be okay now. Uh we have 95% 95 uh 95 good and let's take a look at our output. In our output directory you can see the sentiment analysis summary which is somewhat similar to what we have. This our run info and the result is here. So the the average is about 92% success rate. Uh heal is about 95%. Um I can see the statistics for the stars. Neutral positive. Yeah, you get the point. Um let's see the totals. We only healed one in this batch. And this is really good at the end of the day. Uh we have 100 process success is fine and one healed. Now uh the next batch that we probably would run would be uh from our batch size of the same 100. Now the offset is going to change. So if you change this offset to one then if you can still recall um batch plus I think batch plus you see offsets plus where did we use plus? Uh yeah this one. So we said uh when it's trying to do the slicing so we are saying start from this offset whatever the offset is let's say we start from 0 to 100 and from 101 is 99 so from 100 is where we want to pick up again. So the offset is going to be um from there onward. So in this case if you want to start from um 10 100 going forward. So instead of having to start from 0 to 100, you're going to have um in this case, so if you start at 100, uh you're probably going to start from 100 to 200. Make sense? So if you change this to one, then it's going to be from 1 to 100, but we should probably start from 100, then plus 200. I hope my mathematics is clear. Sometimes I get confused with that as well. All right, so it's picking that up again. So let's see. Loading the review. So it says offset of this. So it start from offset 100 and it goes on to 200 at the end of the day. Make sense? So healing the batch. So let's see if we have healed. True. No healing in this case. Now um what one thing I want to probably show you is uh we can create a batch runner script in this case um that can help us um make things faster. And if you create a new folder in this uh directory, I'll call this script. And I'll maybe call this batch runner, no py. Um I'll just paste this in here. In our batch runner, what we're trying to do is we want to split everything into batches and we can just trigger everything uh to start from uh from whatever it is that we're going to be running. So if we have 5 million records um with batch size of 1,000, this is going to be in sequential order. So we start with this uh if you specify 1,000 batch uh sorry 5 million batch in batch size of this, we're going to be running this in about 5,000 times and maybe not so much effective. And you can also resume your batch uh process in parallel. I mean five uh parallel dags. And you can also resume from previous offset if you prefer. I'll just leave this here in case anybody needs to uh to do this uh on their own. And you can just, you know, create a function that does this by yourself. Um yeah, that should be fine. Uh so if I want to trigger that, I can just say uh you know, Python script batch runner. So instead of running five five00,000 5 million, I'll probably run 5,000. No, let me run 500. The batch size is still going to be remain uh 100 and it's going to be running this in parallel. So if I run this, so I'm going to have the total records that is going to be right. Uh, I need to export my airflow in here as my current directory and rerun this again. Okay, I think I changed the name of the DAG. So, let's see what our DAG name look like. Uh, where's our DAG? So, selfim pipeline. Yep. So, I'll just change this from here and that should be fine. So if you try this again, you probably would have triggered five uh in parallel and you can see we have five uh ducks that are triggered at the same time. So this is probably going to save you some time instead of having to you know trigger this multiple times and you can also look at the templates that was used to trigger this individual uh pipeline. So we have the batch size is 100. The offset is that this is going to have a different offset. You see the batch size offset is 200. Again the next one is maybe 300. Yeah. And um the next offset is going to be zero. That's fine. And this is going to be 500 400 rather. Okay. So this is how to you know trigger multiple uh runs in parallel. And this is for the first 500 uh at the end of the day. So you have 500 uh batches already uh triggered. And you can you know take a look at individual uh processing that is trying to do 10 over 100 is fine. Uh the only problem with this is because everything is going to be running through the same um Olama model then you probably need to uh be wary of what the size of the data that will be used or the model size uh what that will be look like. So at the end of the day um I'm running this in parallel for five um five separate uh runs or triggers. uh you can continue to do this uh if you want and you can increase the batch size. Anyways, you're not going to be sitting down and monitor what the pipeline is doing at the end of the day. You probably will trigger this and you know do some other thing but at the end of the day uh this batch runner process

### [1:30:00](https://www.youtube.com/watch?v=As1QSF3LnvA&t=5400s) Outro

uh that simple script is going to help you to trigger the pipeline multiple times in parallel uh depending on the batch size that you specify. In my case, if you take a look, uh in my case, I specify a batch size a total of 500 and a batch size of 100 uh which is going to be running everything in parallel for five. So 5 * 100 at the same time, everything should get fixed. But in the case where you have let's say 5 million and all that, then you probably need to fine-tune this process a little bit. maybe increase the number of parallelism that you have and the size of your batches because if you have let's say um 5,000 batches, it's going to take a while and most likely it's going to just trigger everything at the same time then queue them all. So as one is finishing the other one is um picking up it in its place. uh but yes um what I also want to do and do the final review in this case if you take a look at this we have our airflow which is properly uh running and scheduled uh as accordingly but what one thing that you probably want to do is um maybe review uh the source system in our source system we are doing uh in our DAG we are doing a load model no sorry load reviews in here we are loading from file I mean if I were you probably if you have this in a DB somewhere or somewhere else you probably would have dev load from database and if it is like a streaming in this case you probably would have loading from Kafka or something similar which is more like a production ready system. So you just wait for a consumer to get triggered. So you're going to have a consumer listening for events in this case and that's going to be running uh as it gets the message it triggers the DAG and then you know keep running that end to end. Yeah, but that's everything end to end. Uh I hope uh this kind of give you some insight into how best you can uh implement some self- alien pipeline uh using you know a model. In this case we're using old lama. Um but one thing I also want to make you understand is this output directory. You can you know just connect this to something like PowerBI. Uh this you're not going to be connecting to individual file. It's going to be the base folder because they have the same structure. It's easy to just explode them in PowerBI or any other BI tool that you prefer to use and just use that in your analysis or the UI analysis that you want to do. So going back to our UI. So we have about 70 of 100 in this case. Uh almost the same thing. So most of them all of them will finish at the same time. Uh most likely but the only thing is if you notice that before now we have 7 to 10 seconds. Now it's about 35 seconds because now everything is running in parallel uh trying to compete for uh request fulfillment by the Olama model. Uh but yeah, regardless, this is how best you can um work with this on your local and this probably gives you some insight into how something like OpenAI or Cloud or Antropic or anything serve their model and you know let it to uh respond to request uh from users. So finishing up with this, I'm just waiting for this to be completed and if anything else we can just pick it up from there. Yeah, this is done and the rest should get completed very fast now. Yeah, everything gets completed roughly at the same time. And if you check the output, you can see that um for each of them we have um this is where we started from. So offset 10 0 200 300 400. So we have about because they're in the same batch anyways. So we have about 500 uh already processed now. And yeah, that makes sense to me. I hope it does uh make sense to you as well. Uh so thank you very much for watching. If you have any questions, please drop them in the comments section and I'll pick it up uh as soon as possible. Thanks for very much for watching and I'll see you in the next one. Chess.

---
*Источник: https://ekstraktznaniy.ru/video/52950*