diff --git a/src/mongo/db/exec/sbe/SConscript b/src/mongo/db/exec/sbe/SConscript index 705758b040b..8c4a392a654 100644 --- a/src/mongo/db/exec/sbe/SConscript +++ b/src/mongo/db/exec/sbe/SConscript @@ -196,6 +196,7 @@ env.CppUnitTest( 'expressions/sbe_get_element_builtin_test.cpp', 'expressions/sbe_if_test.cpp', 'expressions/sbe_index_of_test.cpp', + 'expressions/sbe_integral_test.cpp', 'expressions/sbe_is_array_empty_builtin_test.cpp', 'expressions/sbe_is_member_builtin_test.cpp', 'expressions/sbe_iso_date_to_parts_test.cpp', diff --git a/src/mongo/db/exec/sbe/expressions/expression.cpp b/src/mongo/db/exec/sbe/expressions/expression.cpp index 31cdcaeebb4..704f17e31b4 100644 --- a/src/mongo/db/exec/sbe/expressions/expression.cpp +++ b/src/mongo/db/exec/sbe/expressions/expression.cpp @@ -841,6 +841,12 @@ static stdx::unordered_map kBuiltinFunctions = { BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::aggRemovableSumRemove, true}}, {"aggRemovableSumFinalize", BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::aggRemovableSumFinalize, false}}, + {"aggIntegralAdd", + BuiltinFn{[](size_t n) { return n == 2; }, vm::Builtin::aggIntegralAdd, true}}, + {"aggIntegralRemove", + BuiltinFn{[](size_t n) { return n == 2; }, vm::Builtin::aggIntegralRemove, true}}, + {"aggIntegralFinalize", + BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::aggIntegralFinalize, false}}, }; /** diff --git a/src/mongo/db/exec/sbe/expressions/sbe_integral_test.cpp b/src/mongo/db/exec/sbe/expressions/sbe_integral_test.cpp new file mode 100644 index 00000000000..99704ae2fe0 --- /dev/null +++ b/src/mongo/db/exec/sbe/expressions/sbe_integral_test.cpp @@ -0,0 +1,460 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include +#include +#include +#include +#include + +#include "mongo/base/string_data.h" +#include "mongo/db/exec/sbe/accumulator_sum_value_enum.h" +#include "mongo/db/exec/sbe/expression_test_base.h" +#include "mongo/db/exec/sbe/expressions/expression.h" +#include "mongo/db/exec/sbe/values/slot.h" +#include "mongo/db/exec/sbe/values/value.h" +#include "mongo/db/query/collation/collator_interface_mock.h" +#include "mongo/unittest/assert.h" +#include "mongo/unittest/framework.h" + +namespace mongo::sbe { + +enum class IntegralOp { kAdd, kRemove }; + +class SBEIntegralTest : public EExpressionTestFixture { +public: + std::pair initQueue() { + auto [queueTag, queueVal] = value::makeNewArray(); + auto queue = value::getArrayView(queueVal); + auto [queueInternalArrTag, queueInternalArrVal] = value::makeNewArray(); + auto arr = value::getArrayView(queueInternalArrVal); + arr->push_back(value::TypeTags::Null, 0); + queue->push_back(queueInternalArrTag, queueInternalArrVal); + queue->push_back(value::TypeTags::NumberInt64, 0); + queue->push_back(value::TypeTags::NumberInt64, 0); + return {queueTag, queueVal}; + } + + std::pair initState(boost::optional unitMillis) { + auto [stateTag, stateVal] = value::makeNewArray(); + auto state = value::getArrayView(stateVal); + + // input queue + auto [inputQueueTag, inputQueueVal] = initQueue(); + state->push_back(inputQueueTag, inputQueueVal); + + // sortBy queue + auto [sortByQueueTag, sortByQueueVal] = initQueue(); + state->push_back(sortByQueueTag, sortByQueueVal); + + // sum acc state + auto [removableSumAccTag, removableSumAccVal] = value::makeNewArray(); + auto removableSumAcc = value::getArrayView(removableSumAccVal); + auto [sumAccTag, sumAccVal] = value::makeNewArray(); + auto sumAcc = value::getArrayView(sumAccVal); + // DoubleDoubleSum Acc + sumAcc->reserve(AggSumValueElems::kMaxSizeOfArray); + sumAcc->push_back(value::TypeTags::NumberInt32, value::bitcastFrom(0)); + sumAcc->push_back(value::TypeTags::NumberDouble, value::bitcastFrom(0.0)); + sumAcc->push_back(value::TypeTags::NumberDouble, value::bitcastFrom(0.0)); + // RemovableSum Acc + removableSumAcc->push_back(sumAccTag, sumAccVal); + removableSumAcc->push_back(value::TypeTags::NumberInt64, 0); + removableSumAcc->push_back(value::TypeTags::NumberInt64, 0); + removableSumAcc->push_back(value::TypeTags::NumberInt64, 0); + removableSumAcc->push_back(value::TypeTags::NumberInt64, 0); + removableSumAcc->push_back(value::TypeTags::NumberInt64, 0); + state->push_back(removableSumAccTag, removableSumAccVal); + + // nanCount + state->push_back(value::TypeTags::NumberInt64, 0); + + // unitMillis + if (unitMillis) { + state->push_back(value::TypeTags::NumberInt64, + value::bitcastFrom(*unitMillis)); + } else { + state->push_back(value::TypeTags::Null, 0); + } + + return {stateTag, stateVal}; + } + + void runAndAssertExpression(boost::optional unitMillis, + std::vector>& inputValues, + std::vector>& sortByValues, + std::vector& operations, + std::vector>& expValues) { + value::ViewOfValueAccessor inputAccessor; + auto inputSlot = bindAccessor(&inputAccessor); + + value::ViewOfValueAccessor sortByAccessor; + auto sortBySlot = bindAccessor(&sortByAccessor); + + value::OwnedValueAccessor aggAccessor; + auto aggSlot = bindAccessor(&aggAccessor); + + auto aggIntegralAddExpr = sbe::makeE( + "aggIntegralAdd", + sbe::makeEs(makeE(inputSlot), makeE(sortBySlot))); + auto compiledIntegralAdd = compileAggExpression(*aggIntegralAddExpr, &aggAccessor); + + auto aggIntegralRemoveExpr = sbe::makeE( + "aggIntegralRemove", + sbe::makeEs(makeE(inputSlot), makeE(sortBySlot))); + auto compiledIntegralRemove = compileAggExpression(*aggIntegralRemoveExpr, &aggAccessor); + + auto aggIntegralFinalize = sbe::makeE( + "aggIntegralFinalize", sbe::makeEs(makeE(aggSlot))); + auto compiledIntegralFinalize = compileExpression(*aggIntegralFinalize); + + auto [stateTag, stateVal] = initState(unitMillis); + aggAccessor.reset(stateTag, stateVal); + + // call IntegralOp (integralAdd/Remove) on the inputs and call finalize() method after each + // IntegralOp + size_t addIdx = 0, removeIdx = 0; + for (size_t i = 0; i < operations.size(); ++i) { + vm::CodeFragment* compiledExpr; + size_t idx; + if (operations[i] == IntegralOp::kAdd) { + compiledExpr = compiledIntegralAdd.get(); + idx = addIdx++; + } else { + compiledExpr = compiledIntegralRemove.get(); + idx = removeIdx++; + } + inputAccessor.reset(inputValues[idx].first, inputValues[idx].second); + sortByAccessor.reset(sortByValues[idx].first, sortByValues[idx].second); + auto [runTag, runVal] = runCompiledExpression(compiledExpr); + + aggAccessor.reset(runTag, runVal); + auto [outTag, outVal] = runCompiledExpression(compiledIntegralFinalize.get()); + + ASSERT_EQ(expValues[i].first, outTag); + auto [compareTag, compareVal] = + value::compareValue(expValues[i].first, expValues[i].second, outTag, outVal); + ASSERT_EQ(compareTag, value::TypeTags::NumberInt32); + ASSERT_EQ(value::bitcastTo(compareVal), 0); + value::releaseValue(outTag, outVal); + value::releaseValue(expValues[i].first, expValues[i].second); + } + for (size_t i = 0; i < inputValues.size(); ++i) { + value::releaseValue(inputValues[i].first, inputValues[i].second); + value::releaseValue(sortByValues[i].first, sortByValues[i].second); + } + } + + void runAndAssertErrorCode(boost::optional unitMillis, + std::vector>& inputValues, + std::vector>& sortByValues, + std::vector& operations, + int expErrCode) { + value::ViewOfValueAccessor inputAccessor; + auto inputSlot = bindAccessor(&inputAccessor); + + value::ViewOfValueAccessor sortByAccessor; + auto sortBySlot = bindAccessor(&sortByAccessor); + + value::OwnedValueAccessor aggAccessor; + auto aggSlot = bindAccessor(&aggAccessor); + + auto aggIntegralAddExpr = sbe::makeE( + "aggIntegralAdd", + sbe::makeEs(makeE(inputSlot), makeE(sortBySlot))); + auto compiledIntegralAdd = compileAggExpression(*aggIntegralAddExpr, &aggAccessor); + + auto aggIntegralRemoveExpr = sbe::makeE( + "aggIntegralRemove", + sbe::makeEs(makeE(inputSlot), makeE(sortBySlot))); + auto compiledIntegralRemove = compileAggExpression(*aggIntegralRemoveExpr, &aggAccessor); + + auto aggIntegralFinalize = sbe::makeE( + "aggIntegralFinalize", sbe::makeEs(makeE(aggSlot))); + auto compiledIntegralFinalize = compileExpression(*aggIntegralFinalize); + + auto [stateTag, stateVal] = initState(unitMillis); + aggAccessor.reset(stateTag, stateVal); + + Status status = [&]() { + try { + size_t addIdx = 0, removeIdx = 0; + for (size_t i = 0; i < operations.size(); ++i) { + vm::CodeFragment* compiledExpr; + size_t idx; + if (operations[i] == IntegralOp::kAdd) { + compiledExpr = compiledIntegralAdd.get(); + idx = addIdx++; + } else { + compiledExpr = compiledIntegralRemove.get(); + idx = removeIdx++; + } + inputAccessor.reset(inputValues[idx].first, inputValues[idx].second); + sortByAccessor.reset(sortByValues[idx].first, sortByValues[idx].second); + auto [runTag, runVal] = runCompiledExpression(compiledExpr); + aggAccessor.reset(runTag, runVal); + } + return Status::OK(); + } catch (AssertionException& ex) { + return ex.toStatus(); + } + }(); + ASSERT_FALSE(status.isOK()); + ASSERT_EQ(status.code(), expErrCode); + for (size_t i = 0; i < inputValues.size(); ++i) { + value::releaseValue(inputValues[i].first, inputValues[i].second); + value::releaseValue(sortByValues[i].first, sortByValues[i].second); + } + } +}; + +TEST_F(SBEIntegralTest, IntegralAddRemoveOverDate) { + std::vector> inputValues = { + {value::TypeTags::NumberDouble, value::bitcastFrom(2.95)}, + {value::TypeTags::NumberDouble, value::bitcastFrom(2.7)}, + {value::TypeTags::NumberDouble, value::bitcastFrom(2.6)}, + {value::TypeTags::NumberDouble, value::bitcastFrom(2.98)}}; + std::vector> sortByValues = { + {value::TypeTags::Date, 1589811030000LL}, + {value::TypeTags::Date, 1589811060000LL}, + {value::TypeTags::Date, 1589811090000LL}, + {value::TypeTags::Date, 1589811120000LL}}; + + std::vector integralOps = {IntegralOp::kAdd, + IntegralOp::kAdd, + IntegralOp::kAdd, + IntegralOp::kAdd, + IntegralOp::kRemove, + IntegralOp::kRemove, + IntegralOp::kRemove, + IntegralOp::kRemove}; + std::vector> expValues = { + {value::TypeTags::NumberDouble, value::bitcastFrom(0)}, + {value::TypeTags::NumberDouble, value::bitcastFrom(0.023541666666666666)}, + {value::TypeTags::NumberDouble, value::bitcastFrom(0.045625)}, + {value::TypeTags::NumberDouble, value::bitcastFrom(0.068875)}, + {value::TypeTags::NumberDouble, value::bitcastFrom(0.045333333333333337)}, + {value::TypeTags::NumberDouble, value::bitcastFrom(0.02325)}, + {value::TypeTags::NumberDouble, value::bitcastFrom(0)}, + {value::TypeTags::Null, 0}}; + + boost::optional unitMillis = 60LL * 60LL * 1000LL; // hour unit + runAndAssertExpression(unitMillis, inputValues, sortByValues, integralOps, expValues); +} + +TEST_F(SBEIntegralTest, IntegralWithMixedTypes) { + std::vector> inputValues = { + {value::TypeTags::NumberInt32, value::bitcastFrom(10)}, + {value::TypeTags::NumberInt64, value::bitcastFrom(10ll)}, + {value::TypeTags::NumberDouble, value::bitcastFrom(10.0)}, + {value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128{10.0}).second}, + {value::TypeTags::NumberDouble, value::bitcastFrom(10.0)}, + {value::TypeTags::NumberInt64, value::bitcastFrom(10ll)}, + {value::TypeTags::NumberInt32, value::bitcastFrom(10)}, + }; + + std::vector> sortByValues = { + {value::TypeTags::NumberInt32, value::bitcastFrom(1)}, + {value::TypeTags::NumberInt64, value::bitcastFrom(2l)}, + {value::TypeTags::NumberDouble, value::bitcastFrom(3.0)}, + {value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128{4.0}).second}, + {value::TypeTags::NumberDouble, value::bitcastFrom(5.0)}, + {value::TypeTags::NumberInt64, value::bitcastFrom(6ll)}, + {value::TypeTags::NumberInt32, value::bitcastFrom(7)}, + }; + + std::vector integralOps = {IntegralOp::kAdd, + IntegralOp::kAdd, + IntegralOp::kAdd, + IntegralOp::kAdd, + IntegralOp::kAdd, + IntegralOp::kAdd, + IntegralOp::kAdd, + IntegralOp::kRemove, + IntegralOp::kRemove, + IntegralOp::kRemove, + IntegralOp::kRemove, + IntegralOp::kRemove, + IntegralOp::kRemove, + IntegralOp::kRemove}; + + std::vector> expValues = { + {value::TypeTags::NumberInt32, 0}, + {value::TypeTags::NumberDouble, value::bitcastFrom(10.0)}, + {value::TypeTags::NumberDouble, value::bitcastFrom(20.0)}, + {value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128{30.0}).second}, + {value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128{40.0}).second}, + {value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128{50.0}).second}, + {value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128{60.0}).second}, + {value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128{50.0}).second}, + {value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128{40.0}).second}, + {value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128{30.0}).second}, + {value::TypeTags::NumberDouble, value::bitcastFrom(20.0)}, + {value::TypeTags::NumberDouble, value::bitcastFrom(10.0)}, + {value::TypeTags::NumberInt32, 0}, + {value::TypeTags::Null, 0}, + }; + + runAndAssertExpression(boost::none, inputValues, sortByValues, integralOps, expValues); +} + +TEST_F(SBEIntegralTest, IntegralWithNaNAndInfinityValues) { + std::vector> inputValues = { + {value::TypeTags::NumberInt64, 10}, + {value::TypeTags::NumberDouble, + value::bitcastFrom(std::numeric_limits::quiet_NaN())}, + {value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128::kNegativeNaN).second}, + {value::TypeTags::NumberInt64, 20}, + {value::TypeTags::NumberDouble, + value::bitcastFrom(std::numeric_limits::infinity())}, + {value::TypeTags::NumberDecimal, + value::makeCopyDecimal(Decimal128::kNegativeInfinity).second}, + {value::TypeTags::NumberInt64, 30}, + {value::TypeTags::NumberInt64, 40}, + }; + + std::vector> sortByValues = { + {value::TypeTags::NumberInt64, value::bitcastFrom(1)}, + {value::TypeTags::NumberInt64, value::bitcastFrom(2)}, + {value::TypeTags::NumberInt64, value::bitcastFrom(3)}, + {value::TypeTags::NumberInt64, value::bitcastFrom(4)}, + {value::TypeTags::NumberInt64, value::bitcastFrom(5)}, + {value::TypeTags::NumberInt64, value::bitcastFrom(6)}, + {value::TypeTags::NumberInt64, value::bitcastFrom(7)}, + {value::TypeTags::NumberInt64, value::bitcastFrom(8)}, + }; + + std::vector integralOps = {IntegralOp::kAdd, + IntegralOp::kAdd, + IntegralOp::kAdd, + IntegralOp::kAdd, + IntegralOp::kAdd, + IntegralOp::kAdd, + IntegralOp::kAdd, + IntegralOp::kAdd, + IntegralOp::kRemove, + IntegralOp::kRemove, + IntegralOp::kRemove, + IntegralOp::kRemove, + IntegralOp::kRemove, + IntegralOp::kRemove, + IntegralOp::kRemove, + IntegralOp::kRemove}; + + std::vector> expValues = { + {value::TypeTags::NumberInt32, 0}, + {value::TypeTags::NumberDouble, + value::bitcastFrom(std::numeric_limits::quiet_NaN())}, + {value::TypeTags::NumberDouble, + value::bitcastFrom(std::numeric_limits::quiet_NaN())}, + {value::TypeTags::NumberDouble, + value::bitcastFrom(std::numeric_limits::quiet_NaN())}, + {value::TypeTags::NumberDouble, + value::bitcastFrom(std::numeric_limits::quiet_NaN())}, + {value::TypeTags::NumberDouble, + value::bitcastFrom(std::numeric_limits::quiet_NaN())}, + {value::TypeTags::NumberDouble, + value::bitcastFrom(std::numeric_limits::quiet_NaN())}, + {value::TypeTags::NumberDouble, + value::bitcastFrom(std::numeric_limits::quiet_NaN())}, + {value::TypeTags::NumberDouble, + value::bitcastFrom(std::numeric_limits::quiet_NaN())}, + {value::TypeTags::NumberDouble, + value::bitcastFrom(std::numeric_limits::quiet_NaN())}, + {value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128::kPositiveNaN).second}, + {value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128::kPositiveNaN).second}, + {value::TypeTags::NumberDecimal, + value::makeCopyDecimal(Decimal128::kNegativeInfinity).second}, + {value::TypeTags::NumberDouble, value::bitcastFrom(35.0)}, + {value::TypeTags::NumberInt32, value::bitcastFrom(0)}, + {value::TypeTags::Null, 0}, + }; + + runAndAssertExpression(boost::none, inputValues, sortByValues, integralOps, expValues); +} + +TEST_F(SBEIntegralTest, IntegralWithDatesAndNoUnit) { + std::vector> inputValues = { + {value::TypeTags::NumberDouble, value::bitcastFrom(2.95)}, + {value::TypeTags::NumberDouble, value::bitcastFrom(2.98)}}; + std::vector> sortByValues = { + {value::TypeTags::Date, 1589811030000LL}, {value::TypeTags::Date, 1589811060000LL}}; + + std::vector integralOps = { + IntegralOp::kAdd, IntegralOp::kAdd, IntegralOp::kRemove, IntegralOp::kRemove}; + + runAndAssertErrorCode(boost::none, inputValues, sortByValues, integralOps, 7821111); +} + +TEST_F(SBEIntegralTest, IntegralWithNumbersAndUnit) { + std::vector> inputValues = { + {value::TypeTags::NumberInt32, value::bitcastFrom(10)}, + {value::TypeTags::NumberInt64, value::bitcastFrom(10ll)}, + }; + + std::vector> sortByValues = { + {value::TypeTags::NumberInt32, value::bitcastFrom(1)}, + {value::TypeTags::NumberInt64, value::bitcastFrom(2l)}, + }; + + std::vector integralOps = { + IntegralOp::kAdd, IntegralOp::kAdd, IntegralOp::kRemove, IntegralOp::kRemove}; + + boost::optional unitMillis = 60LL * 60LL * 1000LL; + runAndAssertErrorCode(unitMillis, inputValues, sortByValues, integralOps, 7821110); +} + +TEST_F(SBEIntegralTest, IntegralWithIncorrectTypes1) { + std::vector> inputValues = { + {value::TypeTags::StringSmall, value::makeSmallString("a").second}, + }; + + std::vector> sortByValues = { + {value::TypeTags::NumberInt32, value::bitcastFrom(1)}, + }; + + std::vector integralOps = {IntegralOp::kAdd}; + + runAndAssertErrorCode(boost::none, inputValues, sortByValues, integralOps, 7821109); +} + +TEST_F(SBEIntegralTest, IntegralWithIncorrectTypes2) { + std::vector> inputValues = { + {value::TypeTags::NumberInt32, value::bitcastFrom(1)}, + }; + + std::vector> sortByValues = { + {value::TypeTags::StringSmall, value::makeSmallString("a").second}, + }; + + std::vector integralOps = {IntegralOp::kAdd}; + + runAndAssertErrorCode(boost::none, inputValues, sortByValues, integralOps, 7821111); +} +} // namespace mongo::sbe diff --git a/src/mongo/db/exec/sbe/vm/vm.cpp b/src/mongo/db/exec/sbe/vm/vm.cpp index faaee9aec1d..a5535221013 100644 --- a/src/mongo/db/exec/sbe/vm/vm.cpp +++ b/src/mongo/db/exec/sbe/vm/vm.cpp @@ -7072,6 +7072,384 @@ FastTuple ByteCode::builtinAggRemovableSumF return aggRemovableSumFinalizeImpl(state); } +/** + * Functions that operate on `ArrayQueue` + */ +// Get the underlying array, and start index and end index that demarcates the queue +std::tuple getArrayQueueState(value::Array* arrayQueue) { + auto [arrayTag, arrayVal] = arrayQueue->getAt(static_cast(ArrayQueueElems::kArray)); + uassert(7821100, "Expected an array", arrayTag == value::TypeTags::Array); + auto array = value::getArrayView(arrayVal); + auto size = array->size(); + uassert(7821116, "Expected non-empty array", size > 0); + + auto [startIdxTag, startIdxVal] = + arrayQueue->getAt(static_cast(ArrayQueueElems::kStartIdx)); + uassert(7821101, "Expected NumberInt64 type", startIdxTag == value::TypeTags::NumberInt64); + auto startIdx = value::bitcastTo(startIdxVal); + uassert(7821114, + str::stream() << "Invalid startIdx " << startIdx << " with array size " << size, + startIdx < size); + + auto [queueSizeTag, queueSizeVal] = + arrayQueue->getAt(static_cast(ArrayQueueElems::kQueueSize)); + uassert(7821102, "Expected NumberInt64 type", queueSizeTag == value::TypeTags::NumberInt64); + auto queueSize = value::bitcastTo(queueSizeVal); + uassert(7821115, + str::stream() << "Invalid queueSize " << queueSize << " with array size " << size, + queueSize <= size); + + return {array, startIdx, queueSize}; +} + +// Update the startIdex and index of the `ArrayQueue` +void updateArrayQueueState(value::Array* arrayQueue, size_t startIdx, size_t queueSize) { + arrayQueue->setAt(static_cast(ArrayQueueElems::kStartIdx), + value::TypeTags::NumberInt64, + value::bitcastFrom(startIdx)); + arrayQueue->setAt(static_cast(ArrayQueueElems::kQueueSize), + value::TypeTags::NumberInt64, + value::bitcastFrom(queueSize)); +} + +// Return the size of the queue +size_t arrayQueueSize(value::Array* arrayQueue) { + auto [array, startIdx, queueSize] = getArrayQueueState(arrayQueue); + return queueSize; +} + +// Push an element {tag, value} into the queue +void arrayQueuePush(value::Array* arrayQueue, value::TypeTags tag, value::Value val) { + /* The underlying array acts as a circular buffer for the queue with `startIdx` and `queueSize` + * demarcating the filled region (with remaining region containing nulls). When pushing an + * element to the queue, we set at the corresponding index [= (startIdx + queueSize) % + * arraySize] the element to be added. If the underlying array is filled, we double the size of + * the array (by adding nulls); the existing elements in the queue may need to be rearranged + * when that happens. + * + * Eg, Push {v} : + * => Initial State: (x = filled; _ = empty) + * [x x x x] + * | + * startIdx (queueSize = 4, arraySize = 4) + * + * => Double array size: + * [x x x x _ _ _ _] + * | + * startIdx (queueSize = 4, arraySize = 8) + * + * => Rearrange elements: + * [x x _ _ _ _ x x] + * | + * startIdx (queueSize = 4, arraySize = 8) + * + * => Add element: + * [x x v _ _ _ x x] + * | + * startIdx (queueSize = 5, arraySize = 8) + */ + value::ValueGuard guard{tag, val}; + auto [array, startIdx, queueSize] = getArrayQueueState(arrayQueue); + auto cap = array->size(); + + if (queueSize == cap) { + // reallocate with twice size + auto newCap = cap * 2; + array->reserve(newCap); + auto extend = newCap - cap; + + for (size_t i = 0; i < extend; ++i) { + array->push_back(value::TypeTags::Null, 0); + } + + if (startIdx > 0) { + // existing values wrap over the array + // need to rearrange the values from [startIdx, cap-1] + for (size_t from = cap - 1, to = newCap - 1; from >= startIdx; --from, --to) { + auto [movTag, movVal] = array->swapAt(from, value::TypeTags::Null, 0); + array->setAt(to, movTag, movVal); + } + startIdx += extend; + } + cap = newCap; + } + + auto endIdx = (startIdx + queueSize) % cap; + guard.reset(); + array->setAt(endIdx, tag, val); + updateArrayQueueState(arrayQueue, startIdx, queueSize + 1); +} + +/* Pops an element {tag, value} from the queue and returns it */ +std::pair arrayQueuePop(value::Array* arrayQueue) { + auto [array, startIdx, queueSize] = getArrayQueueState(arrayQueue); + if (queueSize == 0) { + return {value::TypeTags::Nothing, 0}; + } + auto cap = array->size(); + auto pair = array->swapAt(startIdx, value::TypeTags::Null, 0); + + startIdx = (startIdx + 1) % cap; + updateArrayQueueState(arrayQueue, startIdx, queueSize - 1); + return pair; +} + +std::pair arrayQueueFront(value::Array* arrayQueue) { + auto [array, startIdx, queueSize] = getArrayQueueState(arrayQueue); + if (queueSize == 0) { + return {value::TypeTags::Nothing, 0}; + } + return array->getAt(startIdx); +} + +std::pair arrayQueueBack(value::Array* arrayQueue) { + auto [array, startIdx, queueSize] = getArrayQueueState(arrayQueue); + if (queueSize == 0) { + return {value::TypeTags::Nothing, 0}; + } + auto cap = array->size(); + auto endIdx = (startIdx + queueSize - 1) % cap; + return array->getAt(endIdx); +} + +/** + * Helper functions for integralAdd/Remove/Finalize + */ +std::tuple> +getIntegralState(value::TypeTags stateTag, value::Value stateVal) { + uassert( + 7821103, "The accumulator state should be an array", stateTag == value::TypeTags::Array); + auto state = value::getArrayView(stateVal); + + auto maxSize = static_cast(AggIntegralElems::kMaxSizeOfArray); + uassert(7821104, + "The accumulator state should have correct number of elements", + state->size() == maxSize); + + auto [inputQueueTag, inputQueueVal] = + state->getAt(static_cast(AggIntegralElems::kInputQueue)); + uassert(7821105, "InputQueue should be of array type", inputQueueTag == value::TypeTags::Array); + auto inputQueue = value::getArrayView(inputQueueVal); + + auto [sortByQueueTag, sortByQueueVal] = + state->getAt(static_cast(AggIntegralElems::kSortByQueue)); + uassert( + 7821121, "SortByQueue should be of array type", sortByQueueTag == value::TypeTags::Array); + auto sortByQueue = value::getArrayView(sortByQueueVal); + + auto [integralTag, integralVal] = + state->getAt(static_cast(AggIntegralElems::kIntegral)); + uassert(7821106, "Integral should be of array type", integralTag == value::TypeTags::Array); + auto integral = value::getArrayView(integralVal); + + auto [nanCountTag, nanCountVal] = + state->getAt(static_cast(AggIntegralElems::kNanCount)); + uassert(7821107, + "nanCount should be of NumberInt64 type", + nanCountTag == value::TypeTags::NumberInt64); + auto nanCount = value::bitcastTo(nanCountVal); + + boost::optional unitMillis; + auto [unitMillisTag, unitMillisVal] = + state->getAt(static_cast(AggIntegralElems::kUnitMillis)); + if (unitMillisTag != value::TypeTags::Null) { + uassert(7821108, + "unitMillis should be of type NumberInt64", + unitMillisTag == value::TypeTags::NumberInt64); + unitMillis = value::bitcastTo(unitMillisVal); + } + + return {state, inputQueue, sortByQueue, integral, nanCount, unitMillis}; +} + +void updateNaNCount(value::Array* state, int64_t nanCount) { + state->setAt(static_cast(AggIntegralElems::kNanCount), + value::TypeTags::NumberInt64, + value::bitcastFrom(nanCount)); +} + +void assertTypesForIntegeral(value::TypeTags inputTag, + value::TypeTags sortByTag, + boost::optional unitMillis) { + uassert(7821109, "input value should be of numberic type", value::isNumber(inputTag)); + if (unitMillis) { + uassert(7821110, + "Sort-by value should be of date type when unitMillis is provided", + sortByTag == value::TypeTags::Date); + } else { + uassert(7821111, "Sort-by value should be of numeric type", value::isNumber(sortByTag)); + } +} + +FastTuple ByteCode::integralOfTwoPointsByTrapezoidalRule( + std::pair prevInput, + std::pair prevSortByVal, + std::pair newInput, + std::pair newSortByVal) { + if (value::isNaN(prevInput.first, prevInput.second) || + value::isNaN(prevSortByVal.first, prevSortByVal.second) || + value::isNaN(newInput.first, newInput.second) || + value::isNaN(newSortByVal.first, newSortByVal.second)) { + return {false, value::TypeTags::NumberInt64, 0}; + } + + if ((prevSortByVal.first == value::TypeTags::Date && + newSortByVal.first == value::TypeTags::Date) || + (value::isNumber(prevSortByVal.first) && value::isNumber(newSortByVal.first))) { + auto [deltaOwned, deltaTag, deltaVal] = genericSub( + newSortByVal.first, newSortByVal.second, prevSortByVal.first, prevSortByVal.second); + value::ValueGuard deltaGuard{deltaOwned, deltaTag, deltaVal}; + + auto [sumYOwned, sumYTag, sumYVal] = + genericAdd(newInput.first, newInput.second, prevInput.first, prevInput.second); + value::ValueGuard sumYGuard{sumYOwned, sumYTag, sumYVal}; + + auto [integralOwned, integralTag, integralVal] = + genericMul(sumYTag, sumYVal, deltaTag, deltaVal); + value::ValueGuard integralGuard{integralOwned, integralTag, integralVal}; + + auto result = genericDiv( + integralTag, integralVal, value::TypeTags::NumberInt64, value::bitcastFrom(2)); + return result; + } else { + return {false, value::TypeTags::NumberInt64, 0}; + } +} + +FastTuple ByteCode::builtinAggIntegralAdd(ArityType arity) { + auto [stateTag, stateVal] = moveOwnedFromStack(0); + auto [inputTag, inputVal] = moveOwnedFromStack(1); + auto [sortByTag, sortByVal] = moveOwnedFromStack(2); + + value::ValueGuard stateGuard{stateTag, stateVal}; + value::ValueGuard inputGuard{inputTag, inputVal}; + value::ValueGuard sortByGuard{sortByTag, sortByVal}; + + auto [state, inputQueue, sortByQueue, integral, nanCount, unitMillis] = + getIntegralState(stateTag, stateVal); + + assertTypesForIntegeral(inputTag, sortByTag, unitMillis); + + if (value::isNaN(inputTag, inputVal) || value::isNaN(sortByTag, sortByVal)) { + nanCount++; + updateNaNCount(state, nanCount); + } + + auto queueSize = arrayQueueSize(inputQueue); + uassert(7821119, "Queue sizes should match", queueSize == arrayQueueSize(sortByQueue)); + if (queueSize > 0) { + auto inputBack = arrayQueueBack(inputQueue); + auto sortByBack = arrayQueueBack(sortByQueue); + + auto [integralDeltaOwned, integralDeltaTag, integralDeltaVal] = + integralOfTwoPointsByTrapezoidalRule( + inputBack, sortByBack, {inputTag, inputVal}, {sortByTag, sortByVal}); + value::ValueGuard integralDeltaGuard{ + integralDeltaOwned, integralDeltaTag, integralDeltaVal}; + aggRemovableSumImpl<1>(integral, integralDeltaTag, integralDeltaVal); + } + + inputGuard.reset(); + arrayQueuePush(inputQueue, inputTag, inputVal); + + sortByGuard.reset(); + arrayQueuePush(sortByQueue, sortByTag, sortByVal); + + stateGuard.reset(); + return {true, stateTag, stateVal}; +} + +FastTuple ByteCode::builtinAggIntegralRemove(ArityType arity) { + auto [stateTag, stateVal] = moveOwnedFromStack(0); + auto [inputOwned, inputTag, inputVal] = getFromStack(1); + auto [sortByOwned, sortByTag, sortByVal] = getFromStack(2); + + value::ValueGuard stateGuard{stateTag, stateVal}; + + auto [state, inputQueue, sortByQueue, integral, nanCount, unitMillis] = + getIntegralState(stateTag, stateVal); + + assertTypesForIntegeral(inputTag, sortByTag, unitMillis); + + // verify that the input and sortby value to be removed are the first elements of the queues + auto [frontInputTag, frontInputVal] = arrayQueuePop(inputQueue); + value::ValueGuard frontInputGuard{frontInputTag, frontInputVal}; + auto [cmpTag, cmpVal] = value::compareValue(frontInputTag, frontInputVal, inputTag, inputVal); + uassert(7821113, + "Attempted to remove unexpected input value", + cmpTag == value::TypeTags::NumberInt32 && value::bitcastTo(cmpVal) == 0); + + auto [frontSortByTag, frontSortByVal] = arrayQueuePop(sortByQueue); + value::ValueGuard frontSortByGuard{frontSortByTag, frontSortByVal}; + std::tie(cmpTag, cmpVal) = + value::compareValue(frontSortByTag, frontSortByVal, sortByTag, sortByVal); + uassert(7821117, + "Attempted to remove unexpected sortby value", + cmpTag == value::TypeTags::NumberInt32 && value::bitcastTo(cmpVal) == 0); + + if (value::isNaN(inputTag, inputVal) || value::isNaN(sortByTag, sortByVal)) { + nanCount--; + updateNaNCount(state, nanCount); + } + + auto queueSize = arrayQueueSize(inputQueue); + uassert(7821120, "Queue sizes should match", queueSize == arrayQueueSize(sortByQueue)); + if (queueSize > 0) { + auto inputPair = arrayQueueFront(inputQueue); + auto sortByPair = arrayQueueFront(sortByQueue); + + auto [integralDeltaOwned, integralDeltaTag, integralDeltaVal] = + integralOfTwoPointsByTrapezoidalRule( + {inputTag, inputVal}, {sortByTag, sortByVal}, inputPair, sortByPair); + value::ValueGuard integralDeltaGuard{ + integralDeltaOwned, integralDeltaTag, integralDeltaVal}; + aggRemovableSumImpl<-1>(integral, integralDeltaTag, integralDeltaVal); + } + + stateGuard.reset(); + return {true, stateTag, stateVal}; +} + +FastTuple ByteCode::builtinAggIntegralFinalize( + ArityType arity) { + auto [stateOwned, stateTag, stateVal] = getFromStack(0); + + auto [state, inputQueue, sortByQueue, integral, nanCount, unitMillis] = + getIntegralState(stateTag, stateVal); + + auto queueSize = arrayQueueSize(inputQueue); + uassert(7821118, "Queue sizes should match", queueSize == arrayQueueSize(sortByQueue)); + if (queueSize == 0) { + return {false, value::TypeTags::Null, 0}; + } + + if (nanCount > 0) { + return {false, + value::TypeTags::NumberDouble, + value::bitcastFrom(std::numeric_limits::quiet_NaN())}; + } + + auto [resultOwned, resultTag, resultVal] = aggRemovableSumFinalizeImpl(integral); + value::ValueGuard resultGuard{resultOwned, resultTag, resultVal}; + if (unitMillis) { + auto [divResultOwned, divResultTag, divResultVal] = + genericDiv(resultTag, + resultVal, + value::TypeTags::NumberInt64, + value::bitcastFrom(*unitMillis)); + return {divResultOwned, divResultTag, divResultVal}; + } else { + resultGuard.reset(); + return {resultOwned, resultTag, resultVal}; + } +} + + FastTuple ByteCode::dispatchBuiltin(Builtin f, ArityType arity) { switch (f) { @@ -7404,6 +7782,12 @@ FastTuple ByteCode::dispatchBuiltin(Builtin return builtinAggRemovableSum<-1 /*sign*/>(arity); case Builtin::aggRemovableSumFinalize: return builtinAggRemovableSumFinalize(arity); + case Builtin::aggIntegralAdd: + return builtinAggIntegralAdd(arity); + case Builtin::aggIntegralRemove: + return builtinAggIntegralRemove(arity); + case Builtin::aggIntegralFinalize: + return builtinAggIntegralFinalize(arity); } MONGO_UNREACHABLE; @@ -7742,6 +8126,12 @@ std::string builtinToString(Builtin b) { return "aggRemovableSumRemove"; case Builtin::aggRemovableSumFinalize: return "aggRemovableSumFinalize"; + case Builtin::aggIntegralAdd: + return "aggIntegralAdd"; + case Builtin::aggIntegralRemove: + return "aggIntegralRemove"; + case Builtin::aggIntegralFinalize: + return "aggIntegralFinalize"; default: MONGO_UNREACHABLE; } diff --git a/src/mongo/db/exec/sbe/vm/vm.h b/src/mongo/db/exec/sbe/vm/vm.h index 5c51e7ecadf..f283139c12e 100644 --- a/src/mongo/db/exec/sbe/vm/vm.h +++ b/src/mongo/db/exec/sbe/vm/vm.h @@ -813,6 +813,9 @@ enum class Builtin : uint8_t { aggRemovableSumAdd, aggRemovableSumRemove, aggRemovableSumFinalize, + aggIntegralAdd, + aggIntegralRemove, + aggIntegralFinalize, }; std::string builtinToString(Builtin b); @@ -975,6 +978,34 @@ enum class AggRemovableSumElems { kSizeOfArray }; +/** + * This enum defines indices into an 'Array' that stores the state for $integral accumulator + * Element at `kInputQueue` stores the queue of input values + * Element at `kSortByQueue` stores the queue of sortBy values + * Element at `kIntegral` stores the integral over the current window + * Element at `kNanCount` stores the count of NaN values encountered + * Element at `kunitMillis` stores the date unit (Null if not valid) + */ +enum class AggIntegralElems { + kInputQueue, + kSortByQueue, + kIntegral, + kNanCount, + kUnitMillis, + kMaxSizeOfArray +}; + +/** + * This enum defines indices into an 'Array' that stores the state for a queue backed by a + * circular array + * Element at `kArray` stores the underlying array thats holds the elements. This should be + * initialized to a non-zero size initially. + * Element at `kStartIdx` stores the start position of the queue + * Element at `kQueueSize` stores the size of the queue + * The empty values in the array are filled with Null + */ +enum class ArrayQueueElems { kArray, kStartIdx, kQueueSize }; + using SmallArityType = uint8_t; using ArityType = uint32_t; @@ -1797,6 +1828,15 @@ private: void updateRemovableSumAccForIntegerType(value::Array* sumAcc, value::TypeTags rhsTag, value::Value rhsVal); + FastTuple builtinAggIntegralAdd(ArityType arity); + FastTuple builtinAggIntegralRemove(ArityType arity); + FastTuple builtinAggIntegralFinalize(ArityType arity); + FastTuple integralOfTwoPointsByTrapezoidalRule( + std::pair prevInput, + std::pair prevSortByVal, + std::pair newInput, + std::pair newSortByVal); + FastTuple dispatchBuiltin(Builtin f, ArityType arity);