Message Broker: Into String

Strings in most native or performance focused languages tend to present a fair amount of complexity and Rust is no exception to this. There are cases where you have a struct that needs to have a name, such as:

struct ServiceHandle {
    name: String,
}

The first ServiceHandle::new() function I would typically write when first learning Rust would look like this:

fn new(name: String) -> ServiceHandle {
    // other init stuff
    ServiceHandle { name: name }
}

In the real world I generally have a String to pass to ServiceHandle::new(String) so this API works well. But when I’m writing tests for my code, I want to pass hard coded values with type &'static str. In order to do that, I have to call one of the conversion functions.

ServiceHandle::new("listener".into());
ServiceHandle::new("listener".to_owned());
ServiceHandle::new(String::from("listener"));

If I change the signature to something like:

fn new(name: &str) -> ServiceHandle {
    // other init stuff
    ServiceHandle { name: name.to_owned() }
}

Then I have to remember to prefix String with & when passing to the function. Another possibility is to from_string and from_str which is what I started to rely on next. Then I don’t have to remember as much, just use the right one for the right type.

fn from_str(name: &str) -> ServiceHandle {
    // skip init stuff, let from_string do it
    ServiceHandle::from_string(name.to_owned())
}

fn from_string(name: String) -> ServiceHandle {
    // other init stuff
    ServiceHandle { name: name }
}

This gets me into a better state where it is easy to remember what string type it takes, but it feels clumsy. Rust provides the From and Into traits that types can implement to enable more generic coding. They are as their name implies a way to automatically change types. They are “reflexive” which means that if one of them is implemented the other can use it. For example, impl From<A> for B would allow you to write a function like fn thing<T: Into<B>>(arg: T) which could be called like thing(A {}).

So the next iteration of my ServiceHandle::new() went generic:

fn new<S: Into<String>>(name: S) -> ServiceHandle {
    ServiceHandle { name: name.into() }
}

This allows calling with String, &str or several other types that can automatically be converted into a String. Making writing testing code with &'static str simple, while dynamically generated String objects still a first class citizen.

Message Broker: Goals and (De)Motivations

Recently, I read a twitter rant that described message brokers as poor combination load balancer, database, and service discovery tools. It hit me hard since I’d just spent a week diving into writing my own message broker. While I had my dislikes of brokers, I think they are handy tools. The tweeter stated that many of these things should be built into the services. The goal of which to keep the heavy work out of the center of the system. Message brokers doing the opposite when used as a central bus.

Having this description of the problem space is turning out to be nice. It gives me some different framing for the various parts of the message broker I’ll be building and the underlying needs. It also pointed out a heavy flaw that message brokers as a central bus can cause trouble in some systems. While that twitter rant dismayed me at first, I now feel even more energized in building this tool.

This framing of load balancer, database, and service discovery reminds me to go read up on that tech as well. Sourcing papers for those problems while looking into queuing related things. I can acknowledge and make sure these subproblems get solved well enough for my intended scale. That will be a key part of my design going forward, keeping my decisions favoring small to medium scale. I’ve seen message brokers work well in those scenarios and want to make an even better one of those.

This doesn’t mean one couldn’t use the broker in a larger scale operation. But, I’m architecting it to encourage deliberate clustering beyond medium scale. Clustering acknowledges the fact that there are usually groups of services that are able to meet a work request without speaking outside of their group except for one or two edges. What I hope to discover as part of the development process is how to encourage this. Whether documenting and creating examples will be enough, or if I’ll need more core features.

I think keeping the message broker light weight will be instrumental in encouraging clustering. If the message broker is heavy, folks wouldn’t want to run too many instances. If it requires a lot of tuning to be useful, folks will want to only tune it once as a central bus. Side note: as I typed this I realized this is why Redis is so good.

Among the lofty design and architecture goals I want to mention my motivations and put the goals in perspective. This project’s main goal is to be a learning project. I want to better understand the internals of message buses. Most green field backend projects will be utilizing a message bus and smaller services. Understanding the internals of the message bus and keeping them in mind will let me design better services.

I also want to build a complex, performance focused, realistic piece of software in Rust. I find the language fun to work with and writing my own thread orchestration that is safe is delightful. As I build up the basics in the broker and client, I’m learning a lot of practical Rust skills. Like many others writing and coding in Rust in their free time, I’m hoping this will help encourage more jobs writing Rust. If I’m lucky enough, I’ll get to secure one of those jobs.

Message Broker: Channel Naming

I’ve started building a message broker as a learning project. There are several out there such a RabbitMQ, Kafka, Redis’ pub/sub layer, and some brokerless message queue solutions like 0mq. Having used many of them over the years and studying the topic both from the classic “Enterprise Integration” side and the more modern/agile “Microservices” side, I figured I’d try my hand at implementing one.

In this blog post, I’m going to go over how I designed and implemented channel naming. Channel names fill the role of data descriptor used by the publishers and the role of query language for subscribers. This turned out to be a delightful series of problems to explore. Questions such as “how do people use them”, “what features are expected”, “what limitations are common” came up. Realizing that my message broker is essentially providing a naming framework that will have at least some opinion, I needed to ask “are there practices I want to encourage or discourage” and recognize my influence.

In almost all message queue systems, channels have a string based name. This name may be broken up by delimiters, and that delimiter is sometimes chosen by the client or in a config on the broker. Most support the ability to wild card parts of the channel when subscribing. Sometimes the wild card is purely string based, in delimited channels often the wild card is done by chunk. Wild cards are sometimes restricted in what positions they can be in, almost always allowed at the end, sometimes in the middle and sometimes disallowed at the start.

When deciding between these I also wanted to keep in mind what solutions are relatively easy to code, straightforward to debug, and allow for acceptable performance. Not allowing wild cards at all means that simple string comparison works, but omits a common feature. Using simple strings and a well known text query setup like regular expressions would give huge amounts of flexibility in selecting what messages you’d like, but comes with a large performance cost, libraries, and is much harder to spot debug. I decided that since I’m not going to use a common query language, that I’d want to make the names as simple as possible to parse.

A Rope is a data structure I’d heard about a few times and I knew it had something to do with making string operations faster, but I was hazy on the details. Since this is a project without deadlines, I set about reading a paper on them to learn more. Shortly into the paper it was clear this wasn’t the solution for me, ropes are designed for larger bodies of text, manipulating that text in a variety of ways, and making common editor features easier to implement. But it shared that part of the problem was breaking down the text, and part of it was comparing text.

Since channel names are often a hierarchy that uses the delimiter to separate the layers, I split the name using that delimiter into an array. But that then left me with a bunch of smaller strings I had to compare, which seemed much slower than just walking through the name and query once, using the wild cards to skip characters. To avoid a char by char parse on every channel name comparison, I drew on the common native layer practice of hashing strings then comparing the hashes. Hashing has an up front cost of processing the string into a numeric form, but then makes comparisons extremely fast. Since a message’s channel would be used to query for appropriate subscribers, it could be hashed once and compared many times. An unfortunate side effect of hashing the substrings though, I wouldn’t be able to allow partial segment matches. The wild card would be all or nothing in a given position, just a.* no a.b*.

I decided that side effect of only allowing wild cards at the segment layer was ultimately a good thing. While there may be transition times where the feature of partial segment matching would help, it would allow people to break the idea that each segment is a complete chunk of data. This similar reasoning is why the only selectors are exact and wild card, no numeric or alpha only sorts of selection.

After all this research, thinking, note taking, and general meandering thing the computer science fields, I started to implement my solution in Rust. My message broker isn’t actually ready for the channel names yet, as I’m still working on how to manage connections and properly do the various forms of store and forward. But this problem tickled me so I set about solving it anyway. I created a file channel.rs to try building these ideas.

I started with a basic struct with the string form of the name for debugging, and a hashed form of the name for comparisons.

struct Channel {
    raw_name: String,
    hashed_name: Vec<Option<u64>>
}

raw_name is a String so the struct can own the string without any lifetime concerns of a str, this value will mostly be used for debugging or admin purposes. The hashed_name is a Vec so it can be variable sized, currently I have no limit on the number of delimiters you can use. Option is what I used to handle the wild card. If it was Some<u64> then you have a hash to compare. If it was None then it was wild card and you don’t have to compare it. After thinking harder though, I realized that I didn’t want to have the binary Option as my indicator for whether to use a wild card or not. If I added a new type of wild card, for instance, one that allowed any number of segments, I’d have to replace my Option usage everywhere. So instead I’ve preemptively changed to using my own ChannelSegment type like so:

struct Channel {
    raw_name: String,
    hashed_name: Vec<ChannelSegment>
}

enum ChannelSegment {
    Wild,
    Hash(u64)
}

Next, I set about parsing the input. I knew that hard coding strings in tests meant that I wanted to have a from_str variant. People will often have hard coded channel names but there will also be generated ones, and for that allowing a from_string is nice. I also knew I was going to turn it into a String anyway to assign to raw_name. So I did the following to enable both:

pub fn from_str(input: &str) -> Result<Channel, String> {
    Channel::from_string(input.to_owned())
}

pub fn from_string(input: String) -> Result<Channel, String> {
    // parsing code goes here
}

Parsing was a pretty simple matter. Using String.split(char) to get an iterator returning each segment. Then relying on Rust’s pattern matching for cases I specifically cared about matching, like empty string (which I made into an error to prevent mistakes) and "*" which is the wild card character. Building up the hashed name, then returning it.

let mut hashed_name = Vec::new();
for chunk in input.split('.') {
    match chunk {
        "" => {
            return Err("empty entry is invalid".to_owned());
        }
        "*" => {
            hashed_name.push(ChannelSegment::Wild);
        }
        _ => {
            hashed_name.push(ChannelSegment::Hash(calculate_hash(&chunk)));
        }
    }
}
Ok(Channel{
    raw_name: input,
    hashed_name: hashed_name
})

One could easily see using a LRU Cache (Least Recently Used cache that removes the oldest entries when it gets too full) to skip parsing the channel name for the most commonly used channels, but I’m not doing that yet until this proves to be a part that is slowing me down.

To compare between two Channels I added a .matches(&Channel) method. I decided against implementing PartialEq since I wouldn’t be testing for exact match, but instead match when considering wild cards, and most developers expect a more exact match when using ==.

pub fn matches(&self, other: &Channel) -> bool {
    if self.hashed_name.len() != other.hashed_name.len() {
        return false;
    }
    for (a, b) in self.hashed_name.iter().zip(&other.hashed_name) {
        if let (&ChannelSegment::Hash(inner_a), &ChannelSegment::Hash(inner_b)) = (a, b) {
            if inner_a != inner_b {
                return false;
            }
        }
    }
    
    true
}

Since my only wild card is for a single whole segment, I know I can immediately return if they have different lengths. Then I zip the two hashed_name together so I can iterate through them at the same time. In other languages one would commonly just create an integer, increment that and use it to index into both of the sequences at the same time, ensuring to not walk past the end ourselves. In Rust we rely on iterators to give us fast access to our sequences with minimal checking, keeping us safe against others (or ourselves) mutating the sequences in dangerous ways while iterating. Using zip we can create an iterator that walks both sequences at the same time keeping our to our land of safety and speed.

As a side note, the calculate_hash function is one I pulled from the docs but changed a little bit to fit my style better:

fn calculate_hash<T: Hash>(t: &T) -> u64 {
    let mut hasher = DefaultHasher::new();
    t.hash(&mut hasher);
    hasher.finish()
}

A feature of Rust I really enjoyed while developing this, was the ability to write tests in the same file as I went. I could quickly write a few examples then make the code pass, focusing on just the higher level parts of the API and not testing the internals so much. Think more “this errors, this does not error” and less “this returns a vector of integers that are ascending…” while you are adding such tests, to make sure you can quickly change out the implementation while the idea keeps working, here are the tests I wrote as I was developing:

#[test]
fn create_basic_channel() {
    Channel::from_str("a.b.c").unwrap();
    Channel::from_str("name").unwrap();
}

#[test]
fn create_with_wildcard() {
    Channel::from_str("a.*.b").unwrap();
    Channel::from_str("*").unwrap();
    Channel::from_str("*.end").unwrap();
    Channel::from_str("start.*").unwrap();
}

#[test]
fn create_invalid() {
    assert!(Channel::from_str(".a.b").is_err());
    assert!(Channel::from_str("c.b.").is_err());
    assert!(Channel::from_str("g.l..b").is_err());
    assert!(Channel::from_str("").is_err());
}

#[test]
fn matches_with_self_exact() {
    let channel = Channel::from_str("a.b.c").unwrap();
    assert!(channel.matches(&channel));
    let channel = Channel::from_str("a").unwrap();
    assert!(channel.matches(&channel));
    let channel = Channel::from_str("dabbling.b.c").unwrap();
    assert!(channel.matches(&channel));
    let channel = Channel::from_str("abba.bobble").unwrap();
    assert!(channel.matches(&channel));
}

#[test]
fn not_matches_exact() {
    let channel_full = Channel::from_str("s.t.r").unwrap();
    let channel_sub = Channel::from_str("s.t").unwrap();
    assert!(!channel_full.matches(&channel_sub));
    assert!(!channel_sub.matches(&channel_full));
}

#[test]
fn matches_with_self_wild() {
    let channel = Channel::from_str("a.*.c").unwrap();
    assert!(channel.matches(&channel));
    let channel = Channel::from_str("*").unwrap();
    assert!(channel.matches(&channel));
    let channel = Channel::from_str("*.b.c").unwrap();
    assert!(channel.matches(&channel));
    let channel = Channel::from_str("abba.*").unwrap();
    assert!(channel.matches(&channel));
}

#[test]
fn matches_wild_card_one_side() {
    let wild_channel = Channel::from_str("alpha.beta.*").unwrap();
    let tame_channel = Channel::from_str("alpha.beta.charlie").unwrap();
    assert!(wild_channel.matches(&tame_channel));
    assert!(tame_channel.matches(&wild_channel));
}

Mostly mundane stuff, values all quickly typed in to ensure the general concept works, and common edge cases in parsing (start, end, middle) are covered. But note this isn’t combinatorially testing this, there isn’t unicode or other trouble points, those sorts of tests can come with time. Being able to add in a new test quickly when I noticed an edge case is easily one of my favorite features of Rust.

I’ll be open sourcing my work in progress soon, but for now it mostly lives in my notebook as some scribbles, and a smattering of mostly disconnected code on my laptop. If you have feedback on this blog post, how I’ve setup channels in my message broker, or generally about my rust code I’d love to hear it in comments below. Before the comments roll in though, I know using Strings for errors is not great, I just don’t have an error system setup in my project yet.