In the final part of this writeup I’ll show you the actual query that does it all and explain how it works.
To remind – this is the challenge – what we want to accomplish:
Detect and alert when C-class IP subnet tries to access at least 5 different accounts within an hour and at least 75% of total accounts touched has never been accessed from this subnet *and* from this USER_AGENT within the last 45 days.
And, as you may remember from Part 1, here’s the basic logic that we need to implement to make it happen:
- Scan last hour of access log data and find the list of subnets that tried to access multiple accounts within that hour.
- For each of these accounts – take username, IP, USER_AGENT and scan the previous 45 days of traffic history to find usernames that has never been touched by this IP/USER_AGENT combo.
- Alert if number of found accounts is above threshold.
I’ve spent quite a bit of effort to come up with a single query that does all of the above and in a pretty efficient manner.
The biggest part of challenge is that the query needs to find events (#1 above) but then it needs to run very custom search for each event against summary index that we’ve created (#2 above). And added icing on this cake is that the query needs to return results only if there are *no matches* found for the second part of search.
This quickly gets mind-boggling and it is a rather interesting puzzle to solve with SPL.
The way I solved it – is with a combination of macros + advanced subsearch. But instead of returning traditional results – the subsearch will return new, custom crafted Splunk search query to be executed by the outer search.
I named this approach Advanced Negative Look Behind (ANLB) query.
ANLB query is the query that has these capabilities:
- ADVANCED – ability to run very customized query per each found event within subsearch.
This is an upgrade from normal subsearch where outer search just uses simple AND or OR logic on fields returned by subsearch. Traditional subsearch allows for some minor customizations via format parameter. But we want more than that.
- NEGATIVE – ability to return result signifying *no results found*. Or: return “no matches” if there are one or more results found. This covers the case of: “search for login event for given ‘username’ where either IP subnet or USER_AGENT (or both) are matching the input”. If *not found* – return some sort of flag showing possibly suspicious account activity for given username/ip/ua.
- LOOK BEHIND – ability to search historical data – in our case 45 days worth of historical ‘summary_logins’ data for each matching event within subsearch.
To implement #1: ADVANCED part we will be using this pattern:
main search ...[ sub search ... | eval search="very custom search" | fields search] ...
The point to notice is that Splunk allows to return query to the outer search from subsearch. When implemented in the way shown above – the query returned from subsearch will be executed by the outer search directly.
To put all pieces together it would be a good idea to create a few macros.
Macros allows to simplify main search – and more importantly for us – they will allow us to adjust metrics and thresholds of alerts without editing and re-saving alert itself. Macros will allow us to edit these “remotely”.
To create macros, navigate to Settings -> Advanced Search -> Search Macros, press [New] button.
Specify the name of macro, enter definition string and then click [Save]:
Here are the macros that we will need:
- anlb_latest_hour_timerange[search index=summary_v6_logins report=summary__logins latest=now | head 1 | eval search="_time>"+tostring(_time-4000)+" _time<="+tostring(_time-0) | fields search]
This macro is one of my favorites to use when I need deal with somewhat unstable sources of data. What it does is defines query to grab the most recent hour (actually 4000 seconds – 1 hour and 6.5 minutes) of *available* data.
The disadvantage of “earliest=-1h@h latest=now” definition is that it chooses only the last hour of data counting from *current time*. If data comes into Splunk on a periodic basis (such as in case of my client where we used IBM Tealeaf as a data source) then this query will not return any results if there is a delay in incoming data.
Above macro calculates last hour of *available*, indexed data and this is more reliable way to make sure we get data.
- anlb__lookbehind_timerangeearliest=-45d@d latest=-1d@d
This macro will define the time range to search summary index for.
This is the percentage threshold from the definition of challenge.
This macro defines USER_AGENT field. The reason we want to define it in a separate macro is to fix a glitch I noticed in Splunk’s way of handling nested subsearches returning search values directly.
I discovered that if in above case subsearch uses field alias (in my specific case ua is an alias for another field) – Splunk gets confused. The simple way to solve it is to define field name within the outside macro instead of using straight field name.
This is the trigger threshold for the number of accounts (simultaneously accessed by the same subnet).
- iplocation(1)eval Country="Unknown" | iplocation allfields=1 $field_ip$ | eval Country=if(Country="United States", "USA", Country) | eval Location=Region+" - "+City
This macro helps to keep fill final results with Country/Region/City specific geolocation. This helps to quickly determine is given subnet is coming from high risk region (such as Russia, China or Africa).
This macro accepts one parameter: IP address and may be called like this:... | `iplocation(ip)` | table ip, Country, Location
- exclude_whitelistip!=22.214.171.124 ip!=126.96.36.199/24
This macro defines whitelist to exclude from all searches. In this case I showing you a sample of excluding single IP as well as range of IP addresses defined by CIDR mask.
These are usually services, aggregators or testing traffic sources that might otherwise trigger alarms but you know for a fact that they can never be a source of fraud.
These are specific to each enterprise and the list could be left empty as well.
So the last step is to define an alert.
Navigate to Settings -> Searches, Reports and alerts, click [New] button.
Input the values as follows:
Press [Save] button when done. This will make query run every hour (on 15-th minute) and send alert email if suspicious activity is detected. Suspicious – meaning single IP subnet is trying to access multiple accounts it never tried to access before – see definition of challenge above.
After all that said and done – we’re ready for the final ANLB query that makes the magic happens – the value of “Search” field above.
I’ve modified it and simplified it to exclude my client’s and business specific elements and left only bare, operational Splunk query parts.
So, fasten your seat belt, here it goes:index=ERRATIC sourcetype=ERRATIC source=ERRATIC [search index=summary_logins report=summary__logins `anlb_latest_hour_timerange` `exclude_whitelist` | eval ip_subnet=ip | rex mode=sed field=ip_subnet "s/^(\d+\.\d+\.\d+\.).*/\1x/g" | dedup ip_subnet, username_lower, ua | eventstats dc(username_lower) as num_usernames_touched by ip_subnet | where num_usernames_touched>=`anlb_usernames_touched_threshold` | fields ip, ip_subnet, username_lower, ua | eval search_this=" | append [|stats count AS previous_match_found | eval _time=\"" + _time + "\" | eval username_lower=\"" + username_lower + "\" | eval ip_subnet_orig=\"" + ip_subnet + "\" | eval ip_search=\"" + ip + "\" | eval ua_search=\"" + ua + "\" | appendcols override=1 [search index=summary_logins report=summary__logins `anlb__lookbehind_timerange` username_lower=\"" + username_lower + "\" (ip_subnet=\"" + ip_subnet + "\" OR `anlb_ua_field`=\"" + ua + "\" ) | head 1 | eventstats count AS previous_match_found] ] " | stats values(search_this) AS all_searches | eval search=mvjoin(all_searches, " ") | fields search ] | eventstats c as total_entries_per_subnet by ip_subnet_orig | where previous_match_found=0 | eventstats c as unmatched_entries_per_subnet by ip_subnet_orig | eval percent_unmatched=round(unmatched_entries_per_subnet*100/total_entries_per_subnet, 2) | where percent_unmatched>=`anlb_percent_unmatched_threshold` | `iplocation(ip_search)` | rex mode=sed field=ip_subnet_orig "s/\.x$/.0\/24/g" | eval show_ratio=tostring(unmatched_entries_per_subnet)+"/"+tostring(total_entries_per_subnet)+" ("+tostring(percent_unmatched)+"%)" | eval time_string=strftime(_time, "%Y-%m-%d %H:%M:%S") | sort -unmatched_entries_per_subnet, ip_search, _time | table ip_subnet_orig, unmatched_entries_per_subnet, show_ratio, Country, Region, City, time_string, ip_search, username_lower, ua_search | rename ip_subnet_orig AS "Suspicious subnet", time_string AS "Time", ip_search AS "IP", username_lower AS "Username", ua_search AS "Browser User Agent", unmatched_entries_per_subnet AS "Unseen", show_ratio AS "Unseen/Total"
Let’s explain the essential pieces of it. The query consist mostly of big subsearch yielding yet another nested subsearch. There is no main query! Splunk though insist that main query needs to be! Ok, so i feed the monster this “main query”: index=ERRATIC sourcetype=ERRATIC source=ERRATIC so Splunk will happily return zero results and get into the subsearch business – where all magic happens.
[search index=summary_logins report=summary__logins `anlb_latest_hour_timerange` `exclude_whitelist`
| eval ip_subnet=ip
| rex mode=sed field=ip_subnet “s/^(\d+\.\d+\.\d+\.).*/\1x/g”
| dedup ip_subnet, username_lower, ua
| eventstats dc(username_lower) as num_usernames_touched by ip_subnet
| where num_usernames_touched>=`anlb_usernames_touched_threshold`
| fields ip, ip_subnet, username_lower, ua
– This fragment finds all subnets that tried to access multiple accounts within the last hour of *available data*. If found – it returns the set of events for each attempted access with fields: ip, ip_subnet, username_lower, ua.
| eval search_this=”
| append [|stats count AS previous_match_found
| eval _time=\”” + _time + “\”
| eval username_lower=\”” + username_lower + “\”
| eval ip_subnet_orig=\”” + ip_subnet + “\”
| eval ip_search=\”” + ip + “\”
| eval ua_search=\”” + ua + “\”
| appendcols override=1
[search index=summary_logins report=summary__logins `anlb__lookbehind_timerange`
username_lower=\”” + username_lower + “\” (ip_subnet=\”” + ip_subnet + “\” OR `anlb_ua_field`=\”” + ua + “\” )
| head 1
| eventstats count AS previous_match_found]
This is the best piece of the whole query! This is letter “A” in ANLB – which is word “ADVANCED”.
For each event (found at the step previously described) it assembles custom search query to be run against summary index of logins history. First fragment of this search appends the event with values of username_lower, ip, ip subnet and user agent (named a bit differently) + very important field: previous_match_found with value of 0 (zero!). Second piece of search calls appendcols verb with subsearch to be looking into summary index. If anything was found – previous_match_found will become 1(one) – thanks to: … | head 1 . Please note that this search is defined and assembled here as pure string. Later on it will be returned out from main subsearch to be executed.
If this sounds a bit mind boggling and difficult to grasp – it is! 🙂
| stats values(search_this) AS all_searches
| eval search=mvjoin(all_searches, ” “)
| fields search
This fragment glues all event-specific searches into one, big-ass (for lack of better word!) search string. This is what returns out from main subsearch back into main search. If you have lots of matches – the thing returned will be humongous – but nevertheless pretty high performing.
| eventstats c as total_entries_per_subnet by ip_subnet_orig
| where previous_match_found=0
| eventstats c as unmatched_entries_per_subnet by ip_subnet_orig
| eval percent_unmatched=round(unmatched_entries_per_subnet*100/total_entries_per_subnet, 2)
| where percent_unmatched>=`anlb_percent_unmatched_threshold`
This fragment evaluates value of previous_match_found field and checks thresholds that are all defined within macros. We are only interested in suspicious matches exceeding threshold values.
The rest of the search defines few new fields to be returned into the main table, such as ratio of “Unseen/Total” and other explanatory fields that helps to make alert email easily readable.
If suspicious activity is detected – the alert email will be sent to the email addresses specified in alert definition. If nothing bad detected – search will return zero results and no alert will be generated.
Please note that now you may adjust thresholds within the macros without editing main alert definition.
You may also run this search directly to test everything:
From Splunk main page – navigate to your App (in this tutorial I used main Search & Reporting app).
Click “Alerts”, then click on “Open in Search” for “Account Credentials Takeover Detection” alert. This will run the alert query immediately and allow you to see/test actual results.
After testing the implementation of this approach on a live site I’ve noticed pretty low number of false positives, considering the amount of traffic client was getting. The most frequent occurence of false positives would be an occasional customer who got a new computer and tried to login to his account from new place. Then he forgets what his username is and tries different variations of it. This triggers false positive – but from email report it always immediately noticable that different usernames all have similarities.
To note – the logic does not actually check whether username was ever used before.
Hopefully this series of articles will help you to come up with working, efficient anti-fraud, Splunk-based solution specific to your enterprise.
Feel free to contact me with any questions (although I cannot guarantee timely replies).
Connect with me on LinkedIn
Gleb Esman is currently working as Senior Product Manager for Security/Anti-fraud solutions at Splunk leading efforts to build next generation security products covering advanced fraud cases across multiple industry verticals.
Contact Gleb Esman.