001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.stream.duct;
023
024import java.util.Iterator;
025
026/**
027 *
028 */
029public class OpenReducingDuct<Incoming, Outgoing> extends Duct<Grouping<Incoming, Iterator<Incoming>>, Outgoing> implements OpenWindow
030  {
031  final Reducing reducing;
032
033  public OpenReducingDuct( Duct<Outgoing, ?> next )
034    {
035    super( next );
036
037    reducing = (Reducing) getNext();
038    }
039
040  @Override
041  public void receive( Duct previous, int ordinal, Grouping<Incoming, Iterator<Incoming>> grouping )
042    {
043    // don't start a grouping if there are no values in the group
044    if( !grouping.joinIterator.hasNext() )
045      return;
046
047    reducing.startGroup( previous, grouping.key );
048
049    while( grouping.joinIterator.hasNext() )
050      next.receive( this, 0, (Outgoing) grouping.joinIterator.next() );
051
052    reducing.completeGroup( previous, grouping.key );
053    }
054  }