CQRS in Python: Clean Reads, Clean Writes

CQRS in Python: Clean Reads, Clean Writes

ArjanCodes 40 231 просмотров 900 лайков

Machine-readable: Markdown · JSON API · Site index

Поделиться Telegram VK Бот
Транскрипт Скачать .md
Анализ с AI
Описание видео
💡 Check out MongoDB Atlas: https://www.mongodb.com/atlas. When your FastAPI list endpoint starts pulling full documents just to compute derived fields, it’s usually a sign your read and write models are drifting apart. In this video, I refactor a tickets API (FastAPI + MongoDB) using CQRS, explain projections and eventual consistency, and show when this architecture is worth the added complexity, and when it’s not. 🔥 GitHub Repository: https://git.arjan.codes/2026/cqrs. 🎓 ArjanCodes Courses: https://www.arjancodes.com/courses. 💬 Join my Discord server: https://discord.arjan.codes. ⌨️ Keyboard I’m using: https://amzn.to/49YM97v. 🔖 Chapters: 0:00 Intro 0:48 The Starting Point: A Perfectly Reasonable FastAPI App 3:53 Where the First Real Pain Shows Up 5:28 CQRS Explained 6:50 Example: Requirements Change 7:59 Separate Command Intent 12:36 Split Write Storage and Read Storage 13:00 Why MongoDB Fits Nicely 14:33 Step 4: Add a Projector 18:49 What Improves After the Refactor 20:39 When to Use CQRS 21:09 When Not to Use CQRS 21:44 Final Thoughts #arjancodes #softwaredesign #python

Оглавление (13 сегментов)

Intro

Let's say you're doing customer support for OpenAI and you want to compute some data to get an idea of how broken your product is, but your API is a bit limited and can only return all ticket data at once. That is probably a lot of data. On top of that, say you're running some analytics script that gathers that data while those support tickets are flowing in, your reads and your rights may interfere and that leads to all sorts of performance issues. Now, you can avoid all those problems using the CQRS architecture. And I'll explain exactly how it works in this video using an example with fast API and MongoDB. I talk about when to use it and when not why MongoDB actually fits this architecture really well. That's why they're also the sponsor of this video. Now, let's take a look at the

The Starting Point: A Perfectly Reasonable FastAPI App

code. Like I said, the starting point is a fast API app that uses MongoDB. So first create this file that uses pyongo to set up a MongoDB database. So create some uh URL. This by the way I'm running MongoDB locally on my machine with Docker Compose and then I define a database type. I create a client with that URL. And then I have a couple of functions that are useful in my app like getting the database which will be injected into the endpoints uh shutting down the database and some helpful things like being able to convert strings to and from object ids. Now the API app itself is then also pretty straightforward. So I have a collection of tickets, customer support tickets. These tickets can have a status that's either open, triaged or closed. Uh I have a couple of other functions that are useful like getting the current time. Uh making a preview of a message that's part of a customer support ticket. Then I create the app. Uh there's a startup phase which gets the DB and set some indices which is helpful for uh you know common queries that we do. Uh then there's of course shutting down the database when the app shuts down. And then there's a couple of pyantic models. uh one for creating tickets uh one for updating tickets. As you can see, we can update the status or add a note to a customer support ticket. Then we have a ticket details model that contains basically everything related to the ticket. And then there's another uh model here called ticket list item that I'll talk about in a minute. And then we have some endpoints. So I can post to the tickets endpoint and that gets a payload which is this ticket in then creates a ticket. uh we have updating a ticket uh that this is actually kind of a complicated operation because well uh of course first have to check that the ticket actually exists then uh we can only update certain things in certain ways. For example, if a status is closed then uh we cannot reopen that ticket in this particular setup. And for updating the agent there is actually no rule though we can simply update that. And then I have list tickets endpoint that retrieves tickets. talk more about that in a minute. Uh we have retrieving a particular ticket uh given an ID and I have a sort of dashboard endpoint that does some aggregates like collecting how many tickets are open, triaged and closed. So that's the API and this interacts with this database that I created right here. Let's say that there is a UI requirement that the list view needs some extra information like a short preview of the agent note or a has note boolean flag. So what you would do if you just implement stuff right inside this fast API app is that you would have this ticket list item model that contains the um preview message and a has node boolean value, right? And then in the endpoint where we list the tickets, we would basically return that as a response model. And that's already

Where the First Real Pain Shows Up

where you see kind of a problem occurring here because in this uh list view, we need to of course compute this preview and compute whether there is a note. That means that every time you list the tickets using this endpoint that these things are going to be computed. Now, of course, computing a preview message and whether there's a note or not is not really that computationally intensive unless you have a lot of tickets, but uh you can imagine that there are other things that may also add uh extra uh computation and extra problems if you need to compute them every time you want to list the tickets. And as you can see, that happens right here. So I call a make preview helper function that does this job and I turn a note into a boolean. Again here it's not a big deal but if you have many tickers this kind of stuff can add up at the same time in order to make this preview and determine whether there is a note uh then actually the this endpoint does need to retrieve all that data. So there is that problem as well and if you have many tickets that can be problematic. So the problem is here that we have certain read requirements that directly affect the code here and the models that we use. And if you're not careful then the right model might get reshaped just to make reads easier. You know maybe we decide to add the message preview and whether there's a note or not to the document and then it's also easy later on to get all sorts of inconsistencies. And when you start seeing these kind of things in your code, that's a point where CQRS perhaps starts to make sense. Now what

CQRS Explained

is CQRS? That stands for command query responsibility segregation. And the idea is actually quite simple. You have commands and you have queries. The commands they change state. So they update things in a database. And the queries read state. And the interesting thing about CQS is that these two things, they don't have to look the same. They can be completely different and they can also be optimized very differently. In essence, you can even split up the databases. So your commands work with a different database than your reads, for example. Now, CQS does not automatically mean that you also need like a bunch of microservices. It's really about splitting commands and queries, writing state, reading state. It also does not require event sourcing even though it's often mentioned together with event sourcing because event sourcing actually works pretty well with CQS. So you uh post your events. So that's basically the write operations but then you also have read model where you store snapshots or other kind of aggregates based on those events. Now in this example everything is going to be simple. So there's just going to be one fast API app, one MongoDB database, but within that database, I'm going to create separate collections for reading tickets and for modifying tickets. So the first thing

Example: Requirements Change

that I'm going to do is currently there's a single tickets collection. But let's split this up into two collections. So we have the uh commands collection, which is let's say uh ticket commands. And we're going to have the reads collection and this is tickets reads. So we'll use two different collections here. One to modify tickets and one to read tickets. And that also means we need to project. So if you create a ticket, it means you need to store that ticket into these two different places. So uh I'll also remove this collection so I can start updating the code everywhere and see where the problems are. So the first thing that we need to do is here create some indices. So we have uh status and updated ads. So let's say that uh these are indices we want to put on the command side of things like so and we can also add them on the read side. So on this collection we can create these indices as well status and update to that and I can add has note because that's an extra thing that we're going to need on the read side. So currently I have this model for patching the ticket. But if you want to split

Separate Command Intent

this up into commands, you could uh create an update status command which checks that the status is not closed because you can't reopen closed tickets. And we can add an agent note command which is less restrictive. So what you can do then is split up this class basically. So I'm going to copy this and I'm going to call this update status like so. if I can write. So of course this doesn't need the agent note and this is called add agent note and I remove the status. So now we have these two patch options. And now what we can do is add some commands. For example, let's say we have a command update status. This gets a database. So that needs to be injected obviously. and we're going to provide it with an instance of this particular model like so. This is going to return none. And then we can basically take the part of the logic that's in this uh patching thing here. So I'll just uh copy this like so. And then we can edit it here. So let's see. We don't need the database because we already have this. Uh, next to the command, I also will need the ticket ID like so. And then of course this is going to be the command collection. And let's remove the agent note part here. And then I can update the rest of the logic. And what we can also do is clean things up a bit. So for example, I think the commands which really work on the data level, they don't need any knowledge of the API. So a command shouldn't raise an HTTP exception. I think it should raise a value error. So let's change this so that it's a value error like so. And then of course there's no status code. But then we can do it like this. And then what we need to do is replace the business logic. So we have this part basically that we need to put in there. And just going to put that here. And then of course this needs to be commands dot status is not closed. And then it's also going to raise a value error like so. There we go. And then finally we can update the command like so. So this is nice. Now our business logic is right here. And then what we can do is create an endpoint for updating the status. And I've already prepared that because otherwise this video becomes too long. So the endpoint looks like this. [snorts] So if updating the status we get the database and then we simply call the command and that also means that now our routing is nicely separated from the command which is cool. Similarly we can also create a command for adding an agent note. So that looks like this. So it's almost exactly the same. So we get the ID, we check that the ticket actually exists and then we simply update the commands collection with this agent note. So again very simple and then we can also add a separate endpoint for this just like I did with the ticket status and I removed this update endpoint since that is now being replaced by two separate endpoints. Now the third command that we have here that's also writing stuff is creating a ticket. So we can also add a command for that. This also gets a database and it gets a ticket in model and we want that to return a string ID. Then I'm going to take basically this code and move it over to my create command like so. And then of course this also needs to be the commands collection. And then I simply want to return the inserted ID right here. So that's creating the ticket. And then in my endpoint instead of doing all of this I can basically create my ticket with the payload like so. And then we could now retrieve again the ticket from the database or uh we can keep this simple in this particular example and simply return a dictionary with the id like so. So now we've a separated routing the API bit from the actual command. But next what we need to do is

Split Write Storage and Read Storage

actually separating write and read storage. So that means that we have ticket commands which is currently where we're writing things to that we will also have ticket reads where we're reading things from and then we need to project it. And then what we can do next in our app is that commands only write to the right model and that queries only read from the read model. And that's

Why MongoDB Fits Nicely

also why I like using MongoDB in this particular example because this is really built around working with application data and it's not forcing data into a fixed shape. it has a lot of flexibilities. So for CQS specifically uh the document model of MongoDB maps pretty natural to read projections and it has a lot of flexibility in the schema. So the read model can basically evolve independently from the right model and it also means that the read side is eventually consistent by design. In practice that means that a write might take a short moment before it shows up in the read model because we have to do that projection. So for a brief window of time a list or dashboard might be slightly out of date but for most dashboards that's fine. Uh it is something to be aware of though. Another thing that uh we can do is that we can use the same ID across these different collections to keep the projections simple and traceable. Now personally I've worked with MongoDB for over 10 years actually. I've used it in all the products I built and I'm still using it. It's behind my course portal for businesses. We have an analytics dashboard within Iron Codes that uses MongoDB and I have several other tools that we use internally as well. Now I don't host this myself but I use MongoDB Atlas. This removes most of the operational overhead for me. There's backups. It's secure. It's easy to scale. And you even get a free sandbox database that is actually already pretty good. If you want to try MongoDB for yourself, check out the link in the video description. Now, like I said, we

Step 4: Add a Projector

have to if we have these two different collections, we need to make a projection from the write database to the read database. And of course, like a Swedish chef, I have already prepared this. So, here's a project ticket function that takes a database and the ticket ID. And what this does is that it retrieves the document from the right database and then it prepares a new document for reading. So this contains this preview. So we compute this only when we do this projection and whether or not this ticket has a note. And then finally it inserts this projection into the reads collection. So what you need to do is that whenever you create or update a ticket we need to run this projection logic. Right? So in the tickets endpoint where we create a ticket, we not only need to create it but we also need to update it. project it. So this is also going to need the database and this will need our ticket ID like so. And same for updating the status that happens here and of course adding an agent note. So there we also need to do this projection. So now it means that every time we make a change, we create a ticket or we update it. Then we also update our projection. And now I can go down into the read endpoints like getting a tickets. And then of course here I can do the reads collection like so. And in the list tickets endpoint I can also switch to the reads collection like so. But now of course I won't have to do these jobs anymore. I can simply directly refer to the preview because that's already part of the projection. and the has node as well. I don't need to do this. So now get that data anymore. And that means I can also update this projection because now we need the uh subject status updated at we need to preview and the has note. So I only need to retrieve the preview and has note. And when you have a lot of tickets, this is potentially a lot less data that you're retrieving from the database like so. So I'm running the fast API app here locally on my machine and connects also to a local MongoDB. So now let's see if we can uh do a few tests here. So here's a go request to create a ticket with a subject and a message. So we get this ID. So now I'm going to post this go request to update the status to triaged. And now what we can do is also check that the read model actually works. So I'm going to get the ticket with this particular ID and we're going to see if the status actually was correct. So let's read this. And when we look at the particular ticket, you can see that the status is actually triaged. So similarly, we can now also check that listing all the tickets. As you can see, there are a couple of tickets here that I already added in some earlier tests. But here you can see that now we have the projection that is right in there and that's taken from the read model. So whether it has a note or not uh is right in here. Now let's do another test and let's add a note to one of these tickets. So now that has been changed and now let's see what happens with our tickets endpoint. So again we have a listing of all the tickets and as we can see that here we have the preview message but now has node has been set to true because that is happening in the projection. So now we have cleanly separated writing to a database which then does a projection and reading from it. And in this demo of course I do all of this synchronously right if you take a look for example at creating a ticket. So then we create it and then we're going to project it. So that's of course to keep it simple here. But conceptually the read side is still allowed to lag behind the right side. In real systems this often happens asynchronously. So the projection will happen after you already sent the response. And in real systems you probably also need to handle failures during projection. Uh for example by retrying it later or replaying it. Now what has happened here

What Improves After the Refactor

after all these changes? Well, first the list endpoint no longer fetches these full messages. It no longer computes the right fields. It can filter on things like has no cheaply. And that's a really nice advantage of the CQRS architecture. Another thing that's nice that you can now do all sorts of analysis and aggregates [clears throat] without affecting the right part of the system. For example, here I have a dashboard endpoint that does some uh grouping based on uh whether tickets are open, triaged or closed. And I do this of course on the read collection. So if I retrieve this data, so now you see we get this aggregate result. So this only works with the reads collection. The right collection is not affected by this. And that is exactly what we want, right? And you can imagine that in order to support the dashboard more easily, we can also create these projections automatically without even requesting it. So that the dashboard always gets a more or less recent projection of the sales data which is then for example run once an hour in order to get the latest information but then every time you load your dashboard you simply get that summary information the aggregate information without actually having to uh access any of the tickets at all. So there's a lot of things you can do on the read side now to evolve it without affecting how tickets are actually being stored. Now of course in this case I'm using a single database in MongoDB with two different collections. So CPU, memory and the disk are still shared but architecturally the right side stays stable and the read side can evolve with needs of the user interface. And of course later on you can split this up more. You can create a completely separate database for the read side of things or you could even use different types of databases depending on what kind of operations you need to perform.

When to Use CQRS

Now when do you use secrets? When should you avoid it? Well, first it starts to make sense when reading and writing starts to diverge. Your needs are going to be different. Uh for example, if you have dashboards, aggregates or you need uh derived fields and that require some computations. It's also a great way to protect business logic in the sense that for example, if you do updates to uh the ticket structure, that doesn't necessarily have to affect the read side of things. So that's another nice thing that you get as well. Now when do you

When Not to Use CQRS

not use this? Well, if you just have like a small CRUD app uh and basically reading and writing is not that different. Uh you don't want the added complexity. Basically the opposite of the reasons why you would use it. So in practice though I would say start simple. Don't immediately start using CQRS but only refactor towards this type of architecture when the pain is real when you actually need this type of thing. If you are in that situation when reading and writing becomes very different and the operations are complex, CQS is a really great architecture. Now like I said in this

Final Thoughts

example, preview and has node they're pretty small but you know that's exactly how real systems start drifting by adding these small things and that might grow into more and more complexity later on. But I'd love to hear from you. Have you implemented CQS yourself? Have you used it? What have you learned from that? Let me know in the comments. Now, if you enjoyed this video, check out my video on the specification pattern. It's another great and well kind of scary way to keep your business rules under control. The video is right here. Thanks again to MongoDB for sponsoring this video and thank you for watching and see you next

Другие видео автора — ArjanCodes

Ctrl+V

Экстракт Знаний в Telegram

Экстракты и дистилляты из лучших YouTube-каналов — сразу после публикации.

Подписаться

Дайджест Экстрактов

Лучшие методички за неделю — каждый понедельник