Peter Bell

9 minute read

Stream Processing (Part 2)

The second half of our Stream introduction. If you haven’t read the first half, I recommend you do so before going forward. In this post, we cover:

  • reduce
  • flatMap and nested queries
  • Using chunk to improve nested query performance
  • Creating custom Streams

reduce

Conceptually, a Stream can be viewed as a collection containing the results of a query. However, sometimes we want to boil down (or reduce) this collection of records into a single value. In other words: out of the many, one. Similar to aggregate functions like COUNT and MAX in SQL. reduce is a Stream terminal function that can be used to aggregate data by taking a reducer function and an initial value. A quick example using JavaScript’s Array (which also has a reduce method):

var add = function (accumulator, currentValue) {
    return accumulator + currentValue;
}

var sum = [1, 2, 3].reduce(add, 0) // 6

// which is the same as...
var sum = add(0, 1);
sum = add(sum, 2);
sum = add(sum, 3); // 6

Here we’re starting with an Array of numbers. reduce takes a reducer function, add, as a parameter, and an initial value to start the reducing process. The reducer function is then called on each value in the Stream, with the initial value being used as an argument the first time. This process is like a snowball effect: the reducer function’s output becomes the input the next time it’s called, gradually ending up with a final result.

How does this look with GlideQuery’s Stream?

var longestName = new GlideQuery('sys_user')
    .whereNotNull('name')
    .select('name')
    .map(function (user) { return user.name; })
    .reduce(function (acc, name) {
        return name.length > acc.length ? name : acc;
    }, '');

Here you can see we’re calculating the longest name in sys_user. We start with an empty string as our initial value, and our reducer function only returns a different value if it’s longer than the accumulator. If we were to give the reducer function (the function passed to reduce) a name and call it ourselves, we can illustrate how reduce is calculating the longest name in the table:

var getLongerName = function (acc, name) {
    return name.length > acc.length ? name : acc;
};

var longestName = getLongerName('', 'Jason') // longestName == 'Jason'
longestName = getLongerName('Jason', 'Abraham') // longestName == 'Abraham'
longestName = getLongerName('Abraham', 'Mia') // longestName === 'Abraham'

// or with nested calls...

var longestName = getLongerName(getLongerName(getLongerName('', 'Jason'), 'Abraham'), 'Mia')

The above example is just to illustrate how reduce calls the given reducer function, giving you a single value (such as an object, string, or number) from a Stream of multiple values. Let’s visit another example:

var usersByCountry = new GlideQuery('sys_user')
    .select('country')
    .reduce(function (acc, user) {
        var country = user.country || 'Unknown';
        acc[country] = acc[country] || [];
        acc[country].push(user.sys_id);
        return acc;
    }, {});

// {
//     "Unknown": [
//         "005d500b536073005e0addeeff7b12f4",
//         "01b8332e61952010fa9b9c05b7356b18",
//         "02826bf03710200044e0bfc8bcbe5d3f"
//     ],
//     "US": [
//         "38e3383a61d52010fa9b9c05b7356bd0",
//         "7164707a61192010fa9b9c05b7356b5b"
//     ]
//     "Canada": [
//         ...

In this sample, instead of using a simple type like a string or number as our aggregated value, we’re reducing all the sys_user records into a single object. The keys are country, and the values are an array of user IDs who live in those countries. This type of pattern is relatively common and is used many times in production code.

In short, reduce is a bit more complicated than map or filter, but is compelling. Warning: avoid using reduce to calculate the count, max, min, average, or sum of a value if you can use GlideQuery’s built-in aggregate functions. These functions are performed at the SQL level and so are far more performant. However, when they’re not valuable for your case, reduce is a great tool to keep in your back pocket.

flatMap

We’ve discussed map in the previous article. flatMap behaves very similarly to map but with two differences:

  1. The function passed to flatMap must itself return a Stream
  2. flatMap unwraps (or flattens) the returned Stream so that the parent Stream (the one we called flatMap on) returns all the results of the inner Stream

Let’s give a practical example of how this would look in a GlideQuery example:

var getTasksAssignedToGroup = function (groupId) {
    return new GlideQuery('sys_user_grmember')
        .where('group', groupId)
        .select('user')
        .map(function (grmember) { return grmember.user; })
        .flatMap(function (userId) {
            return new GlideQuery('task')
                .where('active', true)
                .where('assigned_to', userId)
                .select()
                .map(function (task) { return task.sys_id; });
        })
        .reduce(function (acc, taskId) {
            return acc.concat(taskId);
        }, []);
};

// Output when called on a group (array of task IDs):
// [
//     "46ee0924a9fe198100f1cf78c198454a",
//     "06224a0261112010fa9b9c05b7356bb7",
//     "15e33b87611d2010fa9b9c05b7356b75",
//     ...
// ]

You can see here we passed flatMap a function that returns a Stream of task ids for a given user. flatMap then returns a Stream of task IDs. If we used map instead, we’d have a Stream of Stream of task IDs, something we probably don’t want. In Java generics nomenclature, map would return Stream>, where flatMap flattens the Stream to simply Stream. If this still isn’t unclear, just remember that flatMap is great when you need to do nested queries like the one above.

You’ll notice I use reduce at the end of the query to convert the results into a regular JavaScript Array. This is sometimes unwise if it’s possible that you may have many records in the Array. Remember that unlike items in a Stream, an Array must allocate memory for all the items. If you think you may have too many records, avoid this last step and simply process the Stream differently. Perhaps you can use forEach to do something with each task ID, for example.

There’s an issue with the above query: the dreaded N+1 problem. For every user in the group, we’re running a query. If a group has 1000 users, then we’re executing 1001 queries total (1 for the sn_user_grmember table + 1000 task queries). Travis Toulson in his CodeCreative blog discusses this issue when using GlideRecord.

There is a way we can minimize this issue with little extra work, thanks to Stream’s chunk method. Let’s see:

chunk

var getTasksAssignedToGroup = function (groupId) {
    return new GlideQuery('sys_user_grmember')
        .where('group', groupId)
        .select('user')
        .map(function (grmember) { return grmember.user; })
        .chunk(5) // batch user IDs into small arrays of 5
        .flatMap(function (userIds) {
            return new GlideQuery('task')
                .where('active', true)
                .where('assigned_to', 'IN', userIds) // Use 'IN' operator
                .select()
                .map(function (task) { return task.sys_id; })
        })
        .reduce(function (acc, taskId) {
            return acc.concat(taskId);
        }, []);
};

You’ll notice I added the chunk call after the initial map call. What does this do? Before calling chunk, I had a Stream of user IDs. What chunk does simply batch these into an Array of 5 user IDs. chunk is an intermediate method, meaning it’s a lazy function which returns a Stream of JavaScript Arrays. In Java nomenclature, chunk converted a Stream into a Stream>.

The reason why this is useful can be found in the nested GlideQuery call on the task table. Note the where clause is now using the “IN” operator. This means that it’s now only executing 15 the queries it was before. Your mileage may vary, but I’ve found tremendous improvements to performance when using this pattern in certain cases.

Using Stream without GlideQuery

Although Stream comes with GlideQuery, it can actually be used by itself. What do you need to do this? Just a function. Let’s look at a simple example:

new Stream(Math.random)
    .map(function (n) { return n * 100; })
    .map(Math.floor)
    .limit(3) // Don't forget to limit!
    .forEach(gs.info);

// 37
// 12
// 88

Here we’re creating our own Stream, which contains whole numbers between 0 and 99 (inclusive). The only thing the Stream constructor needs is a generator function to call. In this case, we used Math.random, which returns a random number between 0 and 1. Streams call the generator function every time a terminal function wants the next item in the Stream. Streams can be infinitely long, which is why if we forget to call limit, we can get stuck in an infinite loop! When returned by the generator function, there is a special reserved value, signals the end of a Stream: Stream.END. So, for example, if you want a finite Stream, you could do something like:

var getCounter = function (count) {
    var i = 0;
    return function () {
        if (i >= count) {
            return Stream.END;
        }
        return i++;
    };
};

var counterFunction = getCounter(20);
new Stream(counterFunction).forEach(gs.info);

// 0
// 1
// 2
// ...
// 19

This is a trivial example, but let’s try leveraging Stream in a more real world scenario like fetching data from a 3rd party API:

var getSearchResults = function (term, page, limit) {
    var offset = limit * page;
    var encodedTerm = GlideStringUtil.urlEncode(term);
    var queryParams = '?offset=' + offset + '&limit=' + limit + '&term=' + encodedTerm;
    var endpoint = 'https://itunes.apple.com/search' + queryParams;
    var client = new sn_ws.RESTMessageV2();
    client.setEndpoint(endpoint);
    client.setHttpMethod('GET');
    var response = client.execute();
    var body = response.getBody();
    return body ? JSON.parse(body) : null;
};

var search = function (term) {
    var page = 0;
    var limit = 50;
    var results = null;
    var resultsIndex = 0;

    return new Stream(function () {
        if (!results || resultsIndex >= limit) {
            var payload = getSearchResults(term, page++, limit);
            if (!payload || payload.resultCount < 1) {
                return Stream.END;
            }
            results = payload.results;
            resultsIndex = 0;
        }

        return resultsIndex >= results.length && results.length < limit
            ? Stream.END
            : results[resultsIndex++];
    });
};

var collections = search('Jimi Hendrix')
    .filter(function (result) { return result.collectionPrice < 10.00; })
    .reduce(function (acc, result) {
        acc[result.collectionName] = result.collectionPrice;
        return acc;
    }, {});

// {
//     "Are You Experienced (Deluxe Version)": 9.99,
//     "Electric Ladyland": 9.99,
//     "Axis: Bold As Love": 9.99,
//     "Live At Monterey": 7.99,
//     "What My Mother Taught Me - EP": 3.96,
//     "Rattle and Hum": 9.99,
//     ...
// }

We’ve abstracted away pagination for the caller search. As far as their concerned, search simply returns a simple Stream of search results from iTunes’ Search API. This Stream can be filtered, mapped, reduced, limited, etc. like any other Stream.

By creating your own Stream, instead of iterating through values in a loop, you can leverage all the handy methods Stream has to offer. Because the requirements to use Stream are low (just a function that returns something), it’s fairly easy to get going once you understand the basics.

Conclusion

reduce is a powerful part of Stream and allows aggregating many values into a single value. flatMap is the preferred way of doing nested queries, though only when necessary as they can hurt performance. chunk can alleviate some of the performance pains when having to use nested queries. Finally, Stream can be harnessed to process data from other sources besides GlideQuery.


Comments