One of my enterprise clients observed that certain class of attacks having a number of distinctive characteristics: attacker who possessed correct user account credentials won’t try to engage into malicious behavior right away.
Initial activity would involve certain degree of reconnaissance and gathering of future victim’s specific data, such as account balances, trade histories and other. So normal “red flags” and risk scoring metrics won’t generate any alerts.
However in many cases such pre-fraudulent activity was still carrying an unusual behavior marks: either session velocity (average number of seconds per hit) or session density (number of hits within the session) or both exceeded normal, baseline session patterns typical for the average client application user’s behavior.
Abnormally high session velocity is also a typical pattern of an automated script-driven session that both fraudsters and competition were using to siphon data from the client’s servers.
One of the possible solutions to detect these activities would be to calculate average session velocity and density and then apply these values to trigger alerts when session metrics exceeded thresholds.
The issue here is that due to the client’s business specific these averages vary greatly depending on the time of the day, time of the week and also the month of the year.
So stuffing some fixed “guessed” threshold values won’t work and will either generate lots of false positives or miss many suspicious sessions.
The solution I came up with is to engage Splunk to calculate average session density and session velocity for each 4-hour period. This data will be recorded within summary index and then referred from the main real time alert to add extra dimension for risk scoring.
In addition, to compensate for activity spikes and holidays I made it to automatically average current 4 hour window values of application-wide session density and velocity across same time period for the previous 4 weeks.
This automated machine learning logic could be applied to any application without prior knowledge of any business specifics and seasonal patterns. It automatically adjusts itself to application specifics and applies meaningful averages at any moment of time automatically compensating for occasional outliers.
The Solution Steps:
- Generate summary index of all user sessions on a regular basis.
- Generate secondary summary index of user session velocity and density for 4 hour periods.
- Updating alerting query to detect abnormal values of velocity and density session thresholds.
1. Generating summary index of all user sessions.
This process is very important for two reasons. First – we need this as an input to generate secondary summary index. Note that Splunk’s transaction command is quite slow. But if we’ll have all sessions pre-generated in advance – this will save us lots of time.
Second: once we have all user sessions summarized within the index – it’s much easier to create a decent investigation and security analytic dashboard that offers deeper insights into specific user’s behaviors as well as allowing to generate quick summaries and stats for given business application.
The points to note: we will use scheduled search to generate sessions summary index. This will inevitably cause some sessions to be cut off apart at a earliest/latest time boundaries. This will somewhat skew stats and to minimize this from happening we will run such schedule only twice within 24 hour period.
One of my client’s portal is an online securities trading application. For that case I’ve chosen to have time boundaries positioned outside of the trading hours so that most sessions that could potentially involve securities trading will remain in tact. Other business cases might require relevant adjustments.
Here are the steps to generate summary index of user sessions, twice a day, for time threshold to always fall outside of the business hours to minimize data skew:
- Create summary index where user sessions summaries will be stored:
From Splunk menu, select: Settings -> Indexes - Press [New] button to create summary index for user sessions
- Index name = summary_sessions
- Optional: adjust “Max size(MB) of entire index” if wanted to
- Press [Save] to create new summary index.
- Create summary index generating scheduled search:
From Splunk menu, select: Settings -> Searches, reports and alerts- From App context, pick the App you want to create alert in, or “Search & Reporting” if you don’t have custom app.
- Click [New] button.
- Search name = Summary: sessions
- Search = see below
- Start time = -12h@h
- Finish time = @h
- [x] Schedule this search
- Schedule type: Cron
- Cron schedule = 15 07,19 * * *
This will run summary index generation search at 07:15 every day (to cover 19:00-07:00) and 19:15 (to cover 07:00-19:00)- Run as = (x) Owner
- Alert condition = always
- Alert mode = Once per search
- Expiration = After 24 hours
- Severity = Medium
- Summary Indexing = [x] Enable
- Select the summary index = summary_sessions (the one created at step 3)
- Press [Save]
At this point your summary index generation will be initiated twice a day according to schedule above.
Here is the search to generate summary index for all user sessions:index=logs | eval mytime=strftime(_time,"%Y-%m-%d %H:%M:%S") | eval pages=page | eval xpages = "["+mytime+"] ["+method+"] ["+code+"] ["+site+"] "+page | transaction session_id maxpause=15m endswith=(page=*/logout*) maxevents=-1 maxopentxn=500000 maxopenevents=20000000 keepevicted=1 mvlist=xpages | eval usernames=username | eval username_lf=lower(mvindex(usernames, 0)) | eval username=username_lf | eval username_ll=lower(mvindex(usernames, -1)) | eval username=if(len(username)>0, username, "") | eval usernames=mvjoin(usernames, "|") | eval ips=ip | eval ip=mvindex(ips, 0) | eval ips=mvjoin(ips, "|") | `iplocationex(ip)` | eval referer = mvindex(referer, 0) | eval uas=ua | eval ua=mvindex(uas, 0) | eval uas=mvjoin(uas, "|") | eval pages=mvjoin(pages, "|") | eval xpages=mvjoin(xpages, "|") | eval sites=site | eval site=mvindex(sites, 0) | eval sites=mvjoin(sites, "|") | eval ht=duration/eventcount | fields _time, evicted, closed_txn, ip, ips, username*, pages, xpages, referer, ua, uas, site, sites, tlsid, pm_fpsc, pm_fpscs, rsatxtDevValue, rsatxtDevValues, Country, Region, City, duration, eventcount | fields - _raw
It is also a good idea to pre-generate at least 1 month worth of summary index data. Use fill_summary_index.py script to make it happen. More details about backfilling of summary index is described in one of my previous posts.
But in essence you do it with this command:
splunk cmd python fill_summary_index.py -app your-app-name -name “Summary: sessions” -et -30d -lt -1h@h -dedup true -j 8 -owner admin -auth admin:YouRAdminPasSw0rD
2. Generating secondary summary index of user session velocity and density.
Once we have all sessions collected within new session_summary index we can generate secondary summary index of session thresholds for velocity and density. To do that you basically repeat all the steps above with the following differences:
- New, secondary summary index to be named: summary_sessions_thresholds
- Start time and finish time fields needs to be empty – these are defined directly within the search (see below).
- Cron schedule = 5 07,11,15,19,23,03 * * *
- Select the summary index = summary_sessions_thresholds.
Search to generate secondary summary index:
index=summary_sessions earliest=-20h@h latest=-16h@h eventcount>2 duration>1 | eval ec=eventcount | join type=left [search index=summary_sessions earliest=-20h@h latest=-16h@h eventcount>1 | eval ec=eventcount | top ec limit=0 | streamstats sum(percent) as percentile | where percentile<=99.7 | eventstats c | eval ec_threshold_3s=c+1 | where percentile<=95 | eventstats c | eval ec_threshold_2s=c+1 | head 1 | fields ec_threshold_2s, ec_threshold_3s ] | where ec<=ec_threshold_3s | eventstats perc1(duration) as duration_1p_per_session, perc5(duration) as duration_5p_per_session, perc95(duration) as duration_95p_per_session by ec | where duration>=duration_5p_per_session AND duration<=duration_95p_per_session | eventstats c as occurences, avg(duration) as duration_per_session by ec | eval secs_1p_per_hit_avg=duration_1p_per_session/ec | eval secs_5p_per_hit_avg=duration_5p_per_session/ec | eval secs_per_hit_avg=duration_per_session/ec | eventstats avg(secs_1p_per_hit_avg) as ht_threshold_3s | eventstats avg(secs_5p_per_hit_avg) as ht_threshold_2s | eventstats avg(secs_per_hit_avg) as ht_threshold_avg | head 1 | addinfo | eval time_range_start=info_min_time | eval time_range_end=info_max_time | eval time_range_start_str=strftime(time_range_start, "%Y-%m-%d %H:%M:%S") | eval time_range_end_str=strftime(time_range_end, "%Y-%m-%d %H:%M:%S") | fields time_range_*, ec_threshold*, ht_threshold* | fields - _raw
3. Updating alerting query to detect abnormal user session velocity and density.
Here’s an updated user session behavior alert query:
index=logs earliest=-1h | eval mytime=strftime(_time,"%Y-%m-%d %H:%M:%S") | eval pages=page | eval xpages = "["+mytime+"] ["+method+"] ["+code+"] ["+site+"] "+page | transaction session_id maxpause=15m endswith=(page=*/logout*) maxevents=-1 maxopentxn=500000 maxopenevents=20000000 keepevicted=1 mvlist=xpages | eval usernames=username | eval username_lf=lower(mvindex(usernames, 0)) | eval username=username_lf | eval username_ll=lower(mvindex(usernames, -1)) | eval username=if(len(username)>0, username, "") | eval ips=ip | eval ip=mvindex(ips, 0) | eval referer = mvindex(referer, 0) | eval uas=ua | eval ua=mvindex(uas, 0) | eval sites=site | eval site=mvindex(sites, 0) | join type=left [search index=summary_sessions_thresholds earliest=-4w@w | eval week1=now()-(3600*24*7) | eval week2=now()-(3600*24*14) | eval week3=now()-(3600*24*21) | eval week4=now()-(3600*24*28) | where (time_range_start<=week1 AND time_range_end>=week1) OR (time_range_start<=week2 AND time_range_end>=week2) OR (time_range_start<=week3 AND time_range_end>=week3) OR (time_range_start<=week4 AND time_range_end>=week4) | stats avg(ec_threshold_2s) as ec_threshold_2s, avg(ec_threshold_3s) as ec_threshold_3s, avg(hs_threshold_2s) as hs_threshold_2s, avg(hs_threshold_3s) as hs_threshold_3s | fields ec_threshold_2s, ec_threshold_3s, hs_threshold_2s, hs_threshold_3s ] | where eventcount>=5 | eval ht=duration/eventcount | eval riskscore=0 | eval riskmsg="" | eval test=mvfind(xpages, "(?i)\[POST.*?/fundstransfer") | eval addscore=10 | eval riskscore=if(isnull(test), riskscore, riskscore+addscore) | eval riskmsg=if(isnull(test), riskmsg, riskmsg+"|(+"+addscore+") Money movement detected") | eval addscore=15 | eval riskscore=if(isnull(test) OR test>=6, riskscore, riskscore+addscore) | eval riskmsg=if(isnull(test) OR test>=6, riskmsg, riskmsg+"|(+"+addscore+") Immediate Money movement detected") | eval test=mvfind(xpages, "(?i)\[POST.*?/updateuserprofile") | eval addscore=15 | eval riskscore=if(isnull(test), riskscore, riskscore+addscore) | eval riskmsg=if(isnull(test), riskmsg, riskmsg+"|(+"+addscore+") Profile edit detected") | eval addscore=15 | eval riskscore=if(isnull(test) OR test>=6, riskscore, riskscore+addscore) | eval riskmsg=if(isnull(test) OR test>=6, riskmsg, riskmsg+"|(+"+addscore+") Immediate Profile edit detected") | eval test=mvfind(xpages, "(?i)\[POST.*?/updatepassword") | eval addscore=20 | eval riskscore=if(isnull(test), riskscore, riskscore+addscore) | eval riskmsg=if(isnull(test), riskmsg, riskmsg+"|(+"+addscore+") Password update detected") | eval test=mvfind(xpages, "(?i)\[POST.*?/(stock|options)tradeorder") | eval addscore=10 | eval riskscore=if(isnull(test), riskscore, riskscore+addscore) | eval riskmsg=if(isnull(test), riskmsg, riskmsg+"|(+"+addscore+") Security Trading detected") | eval addscore=30 | eval riskscore=if(ht>=hs_threshold_3s, riskscore, riskscore+addscore) | eval riskmsg=if(ht>=hs_threshold_3s, riskmsg, riskmsg+"|(+"+addscore+") Excessive session velocity detected") | eval addscore=30 | eval riskscore=if(eventcount<=ec_threshold_3s, riskscore, riskscore+addscore) | eval riskmsg=if(eventcount<=ec_threshold_3s, riskmsg, riskmsg+"|(+"+addscore+") Excessive session density detected") | where riskscore>=45 | makemv delim="|" riskmsg | iplocation allfields=1 ip | eval Country=if(Country="United States", "USA", Country) | eval Location=Region+" - "+City | table _time, eventcount, duration, ht, ip, Country, Location, username, riskscore, riskmsg, xpages | rename ht as "Avg seconds/hit", ip as IP
The important addition to this query are average session velocity and density thresholds applicable to current time calculated as an average of the last 4 weeks during the same time period.
2 extra risk scores are added: excessive session velocity and excessive session density.
To be continued …
Connect with me on LinkedIn
Gleb Esman is currently working as Senior Product Manager for Security/Anti-fraud solutions at Splunk leading efforts to build next generation security products covering advanced fraud cases across multiple industry verticals.
Contact Gleb Esman.